Compare commits
No commits in common. "dfe65001417aa9f13647764ead87d674335e7677" and "87673507d86e693206da817fdc9e5174e0ba395a" have entirely different histories.
dfe6500141
...
87673507d8
4
.gitignore
vendored
4
.gitignore
vendored
@ -30,6 +30,6 @@ Thumbs.db
|
|||||||
gui/__pycache__
|
gui/__pycache__
|
||||||
__pycache__
|
__pycache__
|
||||||
|
|
||||||
|
Testfiles
|
||||||
Testfiles/TestOutputs
|
Testfiles/
|
||||||
Testfiles_
|
Testfiles_
|
||||||
|
|||||||
@ -1,112 +0,0 @@
|
|||||||
# Plan for Autotest GUI Mode Implementation
|
|
||||||
|
|
||||||
**I. Objective:**
|
|
||||||
Create an `autotest.py` script that can launch the Asset Processor GUI headlessly, load a predefined asset (`.zip`), select a predefined preset, verify the predicted rule structure against an expected JSON, trigger processing to a predefined output directory, check the output, and analyze logs for errors or specific messages. This serves as a sanity check for core GUI-driven workflows.
|
|
||||||
|
|
||||||
**II. `TestFiles` Directory:**
|
|
||||||
A new directory named `TestFiles` will be created in the project root (`c:/Users/Theis/Assetprocessor/Asset-Frameworker/TestFiles/`). This directory will house:
|
|
||||||
* Sample asset `.zip` files for testing (e.g., `TestFiles/SampleAsset1.zip`).
|
|
||||||
* Expected rule structure JSON files (e.g., `TestFiles/SampleAsset1_PresetX_expected_rules.json`).
|
|
||||||
* A subdirectory for test outputs (e.g., `TestFiles/TestOutputs/`).
|
|
||||||
|
|
||||||
**III. `autotest.py` Script:**
|
|
||||||
|
|
||||||
1. **Location:** `c:/Users/Theis/Assetprocessor/Asset-Frameworker/autotest.py` (or `scripts/autotest.py`).
|
|
||||||
2. **Command-Line Arguments (with defaults pointing to `TestFiles/`):**
|
|
||||||
* `--zipfile`: Path to the test asset. Default: `TestFiles/default_test_asset.zip`.
|
|
||||||
* `--preset`: Name of the preset. Default: `DefaultTestPreset`.
|
|
||||||
* `--expectedrules`: Path to expected rules JSON. Default: `TestFiles/default_test_asset_rules.json`.
|
|
||||||
* `--outputdir`: Path for processing output. Default: `TestFiles/TestOutputs/DefaultTestOutput`.
|
|
||||||
* `--search` (optional): Log search term. Default: `None`.
|
|
||||||
* `--additional-lines` (optional): Context lines for log search. Default: `0`.
|
|
||||||
3. **Core Structure:**
|
|
||||||
* Imports necessary modules from the main application and PySide6.
|
|
||||||
* Adds project root to `sys.path` for imports.
|
|
||||||
* `AutoTester` class:
|
|
||||||
* **`__init__(self, app_instance: App)`:**
|
|
||||||
* Stores `app_instance` and `main_window`.
|
|
||||||
* Initializes `QEventLoop`.
|
|
||||||
* Connects `app_instance.all_tasks_finished` to `self._on_all_tasks_finished`.
|
|
||||||
* Loads expected rules from the `--expectedrules` file.
|
|
||||||
* **`run_test(self)`:** Orchestrates the test steps sequentially:
|
|
||||||
1. Load ZIP (`main_window.add_input_paths()`).
|
|
||||||
2. Select Preset (`main_window.preset_editor_widget.editor_preset_list.setCurrentItem()`).
|
|
||||||
3. Await Prediction (using `QTimer` to poll `main_window._pending_predictions`, manage with `QEventLoop`).
|
|
||||||
4. Retrieve & Compare Rulelist:
|
|
||||||
* Get actual rules: `main_window.unified_model.get_all_source_rules()`.
|
|
||||||
* Convert actual rules to comparable dict (`_convert_rules_to_comparable()`).
|
|
||||||
* Compare with loaded expected rules (`_compare_rules()`). If mismatch, log and fail.
|
|
||||||
5. Start Processing (emit `main_window.start_backend_processing` with rules and output settings).
|
|
||||||
6. Await Processing (use `QEventLoop` waiting for `_on_all_tasks_finished`).
|
|
||||||
7. Check Output Path (verify existence of output dir, list contents, basic sanity checks like non-emptiness or presence of key asset folders).
|
|
||||||
8. Retrieve & Analyze Logs (`main_window.log_console.log_console_output.toPlainText()`, filter by `--search`, check for tracebacks).
|
|
||||||
9. Report result and call `cleanup_and_exit()`.
|
|
||||||
* **`_check_prediction_status(self)`:** Slot for prediction polling timer.
|
|
||||||
* **`_on_all_tasks_finished(self, processed_count, skipped_count, failed_count)`:** Slot for `App.all_tasks_finished` signal.
|
|
||||||
* **`_convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> dict`:** Converts `SourceRule` objects to the JSON structure defined below.
|
|
||||||
* **`_compare_rules(self, actual_rules_data: dict, expected_rules_data: dict) -> bool`:** Implements Option 1 comparison logic:
|
|
||||||
* Errors if an expected field is missing or its value mismatches.
|
|
||||||
* Logs (but doesn't error on) fields present in actual but not in expected.
|
|
||||||
* **`_process_and_display_logs(self, logs_text: str)`:** Handles log filtering/display.
|
|
||||||
* **`cleanup_and_exit(self, success=True)`:** Quits `QCoreApplication` and `sys.exit()`.
|
|
||||||
* `main()` function:
|
|
||||||
* Parses CLI arguments.
|
|
||||||
* Initializes `QApplication`.
|
|
||||||
* Instantiates `main.App()` (does *not* show the GUI).
|
|
||||||
* Instantiates `AutoTester(app_instance)`.
|
|
||||||
* Uses `QTimer.singleShot(0, tester.run_test)` to start the test.
|
|
||||||
* Runs `q_app.exec()`.
|
|
||||||
|
|
||||||
**IV. `expected_rules.json` Structure (Revised):**
|
|
||||||
Located in `TestFiles/`. Example: `TestFiles/SampleAsset1_PresetX_expected_rules.json`.
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"source_rules": [
|
|
||||||
{
|
|
||||||
"input_path": "SampleAsset1.zip",
|
|
||||||
"supplier_identifier": "ExpectedSupplier",
|
|
||||||
"preset_name": "PresetX",
|
|
||||||
"assets": [
|
|
||||||
{
|
|
||||||
"asset_name": "AssetNameFromPrediction",
|
|
||||||
"asset_type": "Prop",
|
|
||||||
"files": [
|
|
||||||
{
|
|
||||||
"file_path": "relative/path/to/file1.png",
|
|
||||||
"item_type": "MAP_COL",
|
|
||||||
"target_asset_name_override": null
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**V. Mermaid Diagram of Autotest Flow:**
|
|
||||||
```mermaid
|
|
||||||
graph TD
|
|
||||||
A[Start autotest.py with CLI Args (defaults to TestFiles/)] --> B{Setup Args & Logging};
|
|
||||||
B --> C[Init QApplication & main.App (GUI Headless)];
|
|
||||||
C --> D[Instantiate AutoTester(app_instance)];
|
|
||||||
D --> E[QTimer.singleShot -> AutoTester.run_test()];
|
|
||||||
|
|
||||||
subgraph AutoTester.run_test()
|
|
||||||
E --> F[Load Expected Rules from --expectedrules JSON];
|
|
||||||
F --> G[Load ZIP (--zipfile) via main_window.add_input_paths()];
|
|
||||||
G --> H[Select Preset (--preset) via main_window.preset_editor_widget];
|
|
||||||
H --> I[Await Prediction (Poll main_window._pending_predictions via QTimer & QEventLoop)];
|
|
||||||
I -- Prediction Done --> J[Get Actual Rules from main_window.unified_model];
|
|
||||||
J --> K[Convert Actual Rules to Comparable JSON Structure];
|
|
||||||
K --> L{Compare Actual vs Expected Rules (Option 1 Logic)};
|
|
||||||
L -- Match --> M[Start Processing (Emit main_window.start_backend_processing with --outputdir)];
|
|
||||||
L -- Mismatch --> ZFAIL[Log Mismatch & Call cleanup_and_exit(False)];
|
|
||||||
M --> N[Await Processing (QEventLoop for App.all_tasks_finished signal)];
|
|
||||||
N -- Processing Done --> O[Check Output Dir (--outputdir): Exists? Not Empty? Key Asset Folders?];
|
|
||||||
O --> P[Retrieve & Analyze Logs (Search, Tracebacks)];
|
|
||||||
P --> Q[Log Test Success & Call cleanup_and_exit(True)];
|
|
||||||
end
|
|
||||||
|
|
||||||
ZFAIL --> ZEND[AutoTester.cleanup_and_exit() -> QCoreApplication.quit() & sys.exit()];
|
|
||||||
Q --> ZEND;
|
|
||||||
@ -1,83 +0,0 @@
|
|||||||
# User Guide: Usage - Automated GUI Testing (`autotest.py`)
|
|
||||||
|
|
||||||
This document explains how to use the `autotest.py` script for automated sanity checks of the Asset Processor Tool's GUI-driven workflow.
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
|
|
||||||
The `autotest.py` script provides a way to run predefined test scenarios headlessly (without displaying the GUI). It simulates the core user actions: loading an asset, selecting a preset, allowing rules to be predicted, processing the asset, and then checks the results against expectations. This is primarily intended as a developer tool for regression testing and ensuring core functionality remains stable.
|
|
||||||
|
|
||||||
## Running the Autotest Script
|
|
||||||
|
|
||||||
From the project root directory, you can run the script using Python:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
python autotest.py [OPTIONS]
|
|
||||||
```
|
|
||||||
|
|
||||||
### Command-Line Options
|
|
||||||
|
|
||||||
The script accepts several command-line arguments to configure the test run. If not provided, they use predefined default values.
|
|
||||||
|
|
||||||
* `--zipfile PATH_TO_ZIP`:
|
|
||||||
* Specifies the path to the input asset `.zip` file to be used for the test.
|
|
||||||
* Default: `TestFiles/BoucleChunky001.zip`
|
|
||||||
* `--preset PRESET_NAME`:
|
|
||||||
* Specifies the name of the preset to be selected and used for rule prediction and processing.
|
|
||||||
* Default: `Dinesen`
|
|
||||||
* `--expectedrules PATH_TO_JSON`:
|
|
||||||
* Specifies the path to a JSON file containing the expected rule structure that should be generated after the preset is applied to the input asset.
|
|
||||||
* Default: `TestFiles/test-BoucleChunky001.json`
|
|
||||||
* `--outputdir PATH_TO_DIR`:
|
|
||||||
* Specifies the directory where the processed assets will be written.
|
|
||||||
* Default: `TestFiles/TestOutputs/DefaultTestOutput`
|
|
||||||
* `--search "SEARCH_TERM"` (optional):
|
|
||||||
* A string to search for within the application logs generated during the test run. If found, matching log lines (with context) will be highlighted.
|
|
||||||
* Default: None
|
|
||||||
* `--additional-lines NUM_LINES` (optional):
|
|
||||||
* When using `--search`, this specifies how many lines of context before and after each matching log line should be displayed.
|
|
||||||
* Default: `0`
|
|
||||||
|
|
||||||
**Example Usage:**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Run with default test files and settings
|
|
||||||
python autotest.py
|
|
||||||
|
|
||||||
# Run with specific test files and search for a log message
|
|
||||||
python autotest.py --zipfile TestFiles/MySpecificAsset.zip --preset MyPreset --expectedrules TestFiles/MySpecificAsset_rules.json --outputdir TestFiles/TestOutputs/MySpecificOutput --search "Processing complete for asset"
|
|
||||||
```
|
|
||||||
|
|
||||||
## `TestFiles` Directory
|
|
||||||
|
|
||||||
The autotest script relies on a directory named `TestFiles` located in the project root. This directory should contain:
|
|
||||||
|
|
||||||
* **Test Asset `.zip` files:** The actual asset archives used as input for tests (e.g., `default_test_asset.zip`, `MySpecificAsset.zip`).
|
|
||||||
* **Expected Rules `.json` files:** JSON files defining the expected rule structure for a given asset and preset combination (e.g., `default_test_asset_rules.json`, `MySpecificAsset_rules.json`). The structure of this file is detailed in the main autotest plan (`AUTOTEST_GUI_PLAN.md`).
|
|
||||||
* **`TestOutputs/` subdirectory:** This is the default parent directory where the autotest script will create specific output folders for each test run (e.g., `TestFiles/TestOutputs/DefaultTestOutput/`).
|
|
||||||
|
|
||||||
## Test Workflow
|
|
||||||
|
|
||||||
When executed, `autotest.py` performs the following steps:
|
|
||||||
|
|
||||||
1. **Initialization:** Parses command-line arguments and initializes the main application components headlessly.
|
|
||||||
2. **Load Expected Rules:** Loads the `expected_rules.json` file.
|
|
||||||
3. **Load Asset:** Loads the specified `.zip` file into the application.
|
|
||||||
4. **Select Preset:** Selects the specified preset. This triggers the internal rule prediction process.
|
|
||||||
5. **Await Prediction:** Waits for the rule prediction to complete.
|
|
||||||
6. **Compare Rules:** Retrieves the predicted rules from the application and compares them against the loaded expected rules. If there's a mismatch, the test typically fails at this point.
|
|
||||||
7. **Start Processing:** If the rules match, it initiates the asset processing pipeline, directing output to the specified output directory.
|
|
||||||
8. **Await Processing:** Waits for all backend processing tasks to complete.
|
|
||||||
9. **Check Output:** Verifies the existence of the output directory and lists its contents. Basic checks ensure some output was generated.
|
|
||||||
10. **Analyze Logs:** Retrieves logs from the application. If a search term was provided, it filters and displays relevant log portions. It also checks for Python tracebacks, which usually indicate a failure.
|
|
||||||
11. **Report Result:** Prints a summary of the test outcome (success or failure) and exits with an appropriate status code (0 for success, 1 for failure).
|
|
||||||
|
|
||||||
## Interpreting Results
|
|
||||||
|
|
||||||
* **Console Output:** The script will log its progress and the results of each step to the console.
|
|
||||||
* **Log Analysis:** Pay attention to the log output, especially if a `--search` term was used or if any tracebacks are reported.
|
|
||||||
* **Exit Code:**
|
|
||||||
* `0`: Test completed successfully.
|
|
||||||
* `1`: Test failed at some point (e.g., rule mismatch, processing error, traceback found).
|
|
||||||
* **Output Directory:** Inspect the contents of the specified output directory to manually verify the processed assets if needed.
|
|
||||||
|
|
||||||
This automated test helps ensure the stability of the core processing logic when driven by GUI-equivalent actions.
|
|
||||||
Binary file not shown.
@ -1,57 +0,0 @@
|
|||||||
{
|
|
||||||
"source_rules": [
|
|
||||||
{
|
|
||||||
"input_path": "BoucleChunky001.zip",
|
|
||||||
"supplier_identifier": "Dinesen",
|
|
||||||
"preset_name": null,
|
|
||||||
"assets": [
|
|
||||||
{
|
|
||||||
"asset_name": "BoucleChunky001",
|
|
||||||
"asset_type": "Surface",
|
|
||||||
"files": [
|
|
||||||
{
|
|
||||||
"file_path": "BoucleChunky001_AO_1K_METALNESS.png",
|
|
||||||
"item_type": "MAP_AO",
|
|
||||||
"target_asset_name_override": "BoucleChunky001"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"file_path": "BoucleChunky001_COL_1K_METALNESS.png",
|
|
||||||
"item_type": "MAP_COL",
|
|
||||||
"target_asset_name_override": "BoucleChunky001"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"file_path": "BoucleChunky001_DISP16_1K_METALNESS.png",
|
|
||||||
"item_type": "MAP_DISP",
|
|
||||||
"target_asset_name_override": "BoucleChunky001"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"file_path": "BoucleChunky001_DISP_1K_METALNESS.png",
|
|
||||||
"item_type": "MAP_DISP",
|
|
||||||
"target_asset_name_override": "BoucleChunky001"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"file_path": "BoucleChunky001_Fabric.png",
|
|
||||||
"item_type": "EXTRA",
|
|
||||||
"target_asset_name_override": "BoucleChunky001"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"file_path": "BoucleChunky001_METALNESS_1K_METALNESS.png",
|
|
||||||
"item_type": "MAP_METAL",
|
|
||||||
"target_asset_name_override": "BoucleChunky001"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"file_path": "BoucleChunky001_NRM_1K_METALNESS.png",
|
|
||||||
"item_type": "MAP_NRM",
|
|
||||||
"target_asset_name_override": "BoucleChunky001"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"file_path": "BoucleChunky001_ROUGHNESS_1K_METALNESS.png",
|
|
||||||
"item_type": "MAP_ROUGH",
|
|
||||||
"target_asset_name_override": "BoucleChunky001"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
863
autotest.py
863
autotest.py
@ -1,863 +0,0 @@
|
|||||||
import argparse
|
|
||||||
import sys
|
|
||||||
import logging
|
|
||||||
import logging.handlers
|
|
||||||
import time
|
|
||||||
import json
|
|
||||||
import shutil # Import shutil for directory operations
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import List, Dict, Any
|
|
||||||
|
|
||||||
from PySide6.QtCore import QCoreApplication, QTimer, Slot, QEventLoop, QObject, Signal
|
|
||||||
from PySide6.QtWidgets import QApplication, QListWidgetItem
|
|
||||||
|
|
||||||
# Add project root to sys.path
|
|
||||||
project_root = Path(__file__).resolve().parent
|
|
||||||
if str(project_root) not in sys.path:
|
|
||||||
sys.path.insert(0, str(project_root))
|
|
||||||
|
|
||||||
try:
|
|
||||||
from main import App
|
|
||||||
from gui.main_window import MainWindow
|
|
||||||
from rule_structure import SourceRule # Assuming SourceRule is in rule_structure.py
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"Error importing project modules: {e}")
|
|
||||||
print(f"Ensure that the script is run from the project root or that the project root is in PYTHONPATH.")
|
|
||||||
print(f"Current sys.path: {sys.path}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Global variable for the memory log handler
|
|
||||||
autotest_memory_handler = None
|
|
||||||
|
|
||||||
# Custom Log Filter for Concise Output
|
|
||||||
class InfoSummaryFilter(logging.Filter):
|
|
||||||
# Keywords that identify INFO messages to *allow* for concise output
|
|
||||||
SUMMARY_KEYWORDS_PRECISE = [
|
|
||||||
"Test run completed",
|
|
||||||
"Test succeeded",
|
|
||||||
"Test failed",
|
|
||||||
"Rule comparison successful",
|
|
||||||
"Rule comparison failed",
|
|
||||||
"ProcessingEngine finished. Summary:",
|
|
||||||
"Autotest Context:",
|
|
||||||
"Parsed CLI arguments:",
|
|
||||||
"Prediction completed successfully.",
|
|
||||||
"Processing completed.",
|
|
||||||
"Signal 'all_tasks_finished' received",
|
|
||||||
"final status:", # To catch "Asset '...' final status:"
|
|
||||||
"User settings file not found:",
|
|
||||||
"MainPanelWidget: Default output directory set to:",
|
|
||||||
# Search related (as per original filter)
|
|
||||||
"Searching logs for term",
|
|
||||||
"Search term ",
|
|
||||||
"Found ",
|
|
||||||
"No tracebacks found in the logs.",
|
|
||||||
"--- End Log Analysis ---",
|
|
||||||
"Log analysis completed.",
|
|
||||||
]
|
|
||||||
# Patterns for case-insensitive rejection
|
|
||||||
REJECT_PATTERNS_LOWER = [
|
|
||||||
# Original debug prefixes (ensure these are still relevant or merge if needed)
|
|
||||||
"debug:", "orchestrator_trace:", "configuration_debug:", "app_debug:", "output_org_debug:",
|
|
||||||
# Iterative / Per-item / Per-file details / Intermediate steps
|
|
||||||
": item ", # Catches "Asset '...', Item X/Y"
|
|
||||||
"item successfully processed and saved",
|
|
||||||
", file '", # Catches "Asset '...', File '...'"
|
|
||||||
": processing regular map",
|
|
||||||
": found source file:",
|
|
||||||
": determined source bit depth:",
|
|
||||||
"successfully processed regular map",
|
|
||||||
"successfully created mergetaskdefinition",
|
|
||||||
": preparing processing items",
|
|
||||||
": finished preparing items. found",
|
|
||||||
": starting core item processing loop",
|
|
||||||
", task '",
|
|
||||||
": processing merge task",
|
|
||||||
"loaded from context:",
|
|
||||||
"using dimensions from first loaded input",
|
|
||||||
"successfully merged inputs into image",
|
|
||||||
"successfully processed merge task",
|
|
||||||
"mergedtaskprocessorstage result",
|
|
||||||
"calling savevariantsstage",
|
|
||||||
"savevariantsstage result",
|
|
||||||
"adding final details to context",
|
|
||||||
": finished core item processing loop",
|
|
||||||
": copied variant",
|
|
||||||
": copied extra file",
|
|
||||||
": successfully organized",
|
|
||||||
": output organization complete.",
|
|
||||||
": metadata saved to",
|
|
||||||
"worker thread: starting processing for rule:",
|
|
||||||
"preparing workspace for input:",
|
|
||||||
"input is a supported archive",
|
|
||||||
"calling processingengine.process with rule",
|
|
||||||
"calculated sha5 for",
|
|
||||||
"calculated next incrementing value for",
|
|
||||||
"verify: processingengine.process called",
|
|
||||||
": effective supplier set to",
|
|
||||||
": metadata initialized.",
|
|
||||||
": file rules queued for processing",
|
|
||||||
"successfully loaded base application settings",
|
|
||||||
"successfully loaded and merged asset_type_definitions",
|
|
||||||
"successfully loaded and merged file_type_definitions",
|
|
||||||
"starting rule-based prediction for:",
|
|
||||||
"rule-based prediction finished successfully for",
|
|
||||||
"finished rule-based prediction run for",
|
|
||||||
"updating model with rule-based results for source:",
|
|
||||||
"debug task ",
|
|
||||||
"worker thread: finished processing for rule:",
|
|
||||||
"task finished signal received for",
|
|
||||||
# Autotest step markers (not global summaries)
|
|
||||||
"step 1: loading zip file:",
|
|
||||||
"step 2: selecting preset:",
|
|
||||||
"step 4: retrieving and comparing rules...",
|
|
||||||
"step 5: starting processing...",
|
|
||||||
"step 7: checking output path:",
|
|
||||||
"output path check completed.",
|
|
||||||
]
|
|
||||||
|
|
||||||
def filter(self, record):
|
|
||||||
# Allow CRITICAL, ERROR, WARNING unconditionally
|
|
||||||
if record.levelno >= logging.WARNING:
|
|
||||||
return True
|
|
||||||
|
|
||||||
if record.levelno == logging.INFO:
|
|
||||||
msg = record.getMessage()
|
|
||||||
msg_lower = msg.lower() # For case-insensitive pattern rejection
|
|
||||||
|
|
||||||
# 1. Explicitly REJECT if message contains verbose patterns (case-insensitive)
|
|
||||||
for pattern in self.REJECT_PATTERNS_LOWER: # Use the new list
|
|
||||||
if pattern in msg_lower:
|
|
||||||
return False # Reject
|
|
||||||
|
|
||||||
# 2. Then, if not rejected, ALLOW only if message contains precise summary keywords
|
|
||||||
for keyword in self.SUMMARY_KEYWORDS_PRECISE: # Use the new list
|
|
||||||
if keyword in msg: # Original message for case-sensitive summary keywords if needed
|
|
||||||
return True # Allow
|
|
||||||
|
|
||||||
# 3. Reject all other INFO messages that don't match precise summary keywords
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Reject levels below INFO (e.g., DEBUG) by default for this handler
|
|
||||||
return False
|
|
||||||
|
|
||||||
# --- Root Logger Configuration for Concise Console Output ---
|
|
||||||
def setup_autotest_logging():
|
|
||||||
"""
|
|
||||||
Configures the root logger for concise console output for autotest.py.
|
|
||||||
This ensures that only essential summary information, warnings, and errors
|
|
||||||
are displayed on the console by default.
|
|
||||||
"""
|
|
||||||
root_logger = logging.getLogger()
|
|
||||||
|
|
||||||
# 1. Remove all existing handlers from the root logger.
|
|
||||||
# This prevents interference from other logging configurations.
|
|
||||||
for handler in root_logger.handlers[:]:
|
|
||||||
root_logger.removeHandler(handler)
|
|
||||||
handler.close() # Close handler before removing
|
|
||||||
|
|
||||||
# 2. Set the root logger's level to DEBUG to capture everything for the memory handler.
|
|
||||||
# The console handler will still filter down to INFO/selected.
|
|
||||||
root_logger.setLevel(logging.DEBUG) # Changed from INFO to DEBUG
|
|
||||||
|
|
||||||
# 3. Create a new StreamHandler for sys.stdout (for concise console output).
|
|
||||||
console_handler = logging.StreamHandler(sys.stdout)
|
|
||||||
|
|
||||||
# 4. Set this console handler's level to INFO.
|
|
||||||
# The filter will then decide which INFO messages to display on console.
|
|
||||||
console_handler.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
# 5. Apply the enhanced InfoSummaryFilter to the console handler.
|
|
||||||
info_filter = InfoSummaryFilter()
|
|
||||||
console_handler.addFilter(info_filter)
|
|
||||||
|
|
||||||
# 6. Set a concise formatter for the console handler.
|
|
||||||
formatter = logging.Formatter('[%(levelname)s] %(message)s')
|
|
||||||
console_handler.setFormatter(formatter)
|
|
||||||
|
|
||||||
# 7. Add this newly configured console handler to the root_logger.
|
|
||||||
root_logger.addHandler(console_handler)
|
|
||||||
|
|
||||||
# 8. Setup the MemoryHandler
|
|
||||||
global autotest_memory_handler # Declare usage of global
|
|
||||||
autotest_memory_handler = logging.handlers.MemoryHandler(
|
|
||||||
capacity=20000, # Increased capacity
|
|
||||||
flushLevel=logging.CRITICAL + 1, # Prevent automatic flushing
|
|
||||||
target=None # Does not flush to another handler
|
|
||||||
)
|
|
||||||
autotest_memory_handler.setLevel(logging.DEBUG) # Capture all logs from DEBUG up
|
|
||||||
# Not adding a formatter here, will format in _process_and_display_logs
|
|
||||||
|
|
||||||
# 9. Add the memory handler to the root logger.
|
|
||||||
root_logger.addHandler(autotest_memory_handler)
|
|
||||||
|
|
||||||
# Call the setup function early in the script's execution.
|
|
||||||
setup_autotest_logging()
|
|
||||||
|
|
||||||
# Logger for autotest.py's own messages.
|
|
||||||
# Messages from this logger will propagate to the root logger and be filtered
|
|
||||||
# by the console_handler configured above.
|
|
||||||
# Setting its level to DEBUG allows autotest.py to generate DEBUG messages,
|
|
||||||
# which won't appear on the concise console (due to handler's INFO level)
|
|
||||||
# but can be captured by other handlers (e.g., the GUI's log console).
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
logger.setLevel(logging.DEBUG) # Ensure autotest.py can generate DEBUGs for other handlers
|
|
||||||
|
|
||||||
# Note: The GUI's log console (e.g., self.main_window.log_console.log_console_output)
|
|
||||||
# is assumed to capture all logs (including DEBUG) from various modules.
|
|
||||||
# The _process_and_display_logs function then uses these comprehensive logs for the --search feature.
|
|
||||||
# This root logger setup primarily makes autotest.py's direct console output concise,
|
|
||||||
# ensuring that only filtered, high-level information appears on stdout by default.
|
|
||||||
# --- End of Root Logger Configuration ---
|
|
||||||
|
|
||||||
# --- Argument Parsing ---
|
|
||||||
def parse_arguments():
|
|
||||||
"""Parses command-line arguments for the autotest script."""
|
|
||||||
parser = argparse.ArgumentParser(description="Automated test script for Asset Processor GUI.")
|
|
||||||
parser.add_argument(
|
|
||||||
"--zipfile",
|
|
||||||
type=Path,
|
|
||||||
default=project_root / "TestFiles" / "BoucleChunky001.zip",
|
|
||||||
help="Path to the test asset ZIP file. Default: TestFiles/BoucleChunky001.zip"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--preset",
|
|
||||||
type=str,
|
|
||||||
default="Dinesen", # This should match a preset name in the application
|
|
||||||
help="Name of the preset to use. Default: Dinesen"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--expectedrules",
|
|
||||||
type=Path,
|
|
||||||
default=project_root / "TestFiles" / "Test-BoucleChunky001.json",
|
|
||||||
help="Path to the JSON file with expected rules. Default: TestFiles/Test-BoucleChunky001.json"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--outputdir",
|
|
||||||
type=Path,
|
|
||||||
default=project_root / "TestFiles" / "TestOutputs" / "BoucleChunkyOutput",
|
|
||||||
help="Path for processing output. Default: TestFiles/TestOutputs/BoucleChunkyOutput"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--search",
|
|
||||||
type=str,
|
|
||||||
default=None,
|
|
||||||
help="Optional log search term. Default: None"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--additional-lines",
|
|
||||||
type=int,
|
|
||||||
default=0,
|
|
||||||
help="Context lines for log search. Default: 0"
|
|
||||||
)
|
|
||||||
return parser.parse_args()
|
|
||||||
|
|
||||||
class AutoTester(QObject):
|
|
||||||
"""
|
|
||||||
Handles the automated testing process for the Asset Processor GUI.
|
|
||||||
"""
|
|
||||||
# Define signals if needed, e.g., for specific test events
|
|
||||||
# test_step_completed = Signal(str)
|
|
||||||
|
|
||||||
def __init__(self, app_instance: App, cli_args: argparse.Namespace):
|
|
||||||
super().__init__()
|
|
||||||
self.app_instance: App = app_instance
|
|
||||||
self.main_window: MainWindow = app_instance.main_window
|
|
||||||
self.cli_args: argparse.Namespace = cli_args
|
|
||||||
self.event_loop = QEventLoop(self)
|
|
||||||
self.prediction_poll_timer = QTimer(self)
|
|
||||||
self.expected_rules_data: Dict[str, Any] = {}
|
|
||||||
self.test_step: str = "INIT" # Possible values: INIT, LOADING_ZIP, SELECTING_PRESET, AWAITING_PREDICTION, PREDICTION_COMPLETE, COMPARING_RULES, STARTING_PROCESSING, AWAITING_PROCESSING, PROCESSING_COMPLETE, CHECKING_OUTPUT, ANALYZING_LOGS, DONE
|
|
||||||
|
|
||||||
if not self.main_window:
|
|
||||||
logger.error("MainWindow instance not found in App. Cannot proceed.")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Connect signals
|
|
||||||
if hasattr(self.app_instance, 'all_tasks_finished') and isinstance(self.app_instance.all_tasks_finished, Signal):
|
|
||||||
self.app_instance.all_tasks_finished.connect(self._on_all_tasks_finished)
|
|
||||||
else:
|
|
||||||
logger.warning("App instance does not have 'all_tasks_finished' signal or it's not a Signal. Processing completion might not be detected.")
|
|
||||||
|
|
||||||
self._load_expected_rules()
|
|
||||||
|
|
||||||
def _load_expected_rules(self) -> None:
|
|
||||||
"""Loads the expected rules from the JSON file specified by cli_args."""
|
|
||||||
self.test_step = "LOADING_EXPECTED_RULES"
|
|
||||||
logger.debug(f"Loading expected rules from: {self.cli_args.expectedrules}")
|
|
||||||
try:
|
|
||||||
with open(self.cli_args.expectedrules, 'r') as f:
|
|
||||||
self.expected_rules_data = json.load(f)
|
|
||||||
logger.debug("Expected rules loaded successfully.")
|
|
||||||
except FileNotFoundError:
|
|
||||||
logger.error(f"Expected rules file not found: {self.cli_args.expectedrules}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
logger.error(f"Error decoding expected rules JSON: {e}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"An unexpected error occurred while loading expected rules: {e}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
|
|
||||||
def run_test(self) -> None:
|
|
||||||
"""Orchestrates the test steps."""
|
|
||||||
logger.info("Starting test run...")
|
|
||||||
|
|
||||||
if not self.expected_rules_data: # Ensure rules were loaded
|
|
||||||
logger.error("Expected rules not loaded. Aborting test.")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Add a specific summary log for essential context
|
|
||||||
logger.info(f"Autotest Context: Input='{self.cli_args.zipfile.name}', Preset='{self.cli_args.preset}', Output='{self.cli_args.outputdir}'")
|
|
||||||
|
|
||||||
# Step 1: Load ZIP
|
|
||||||
self.test_step = "LOADING_ZIP"
|
|
||||||
logger.info(f"Step 1: Loading ZIP file: {self.cli_args.zipfile}") # KEEP INFO - Passes filter
|
|
||||||
if not self.cli_args.zipfile.exists():
|
|
||||||
logger.error(f"ZIP file not found: {self.cli_args.zipfile}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
# Assuming add_input_paths can take a list of strings or Path objects
|
|
||||||
self.main_window.add_input_paths([str(self.cli_args.zipfile)])
|
|
||||||
logger.debug("ZIP file loading initiated.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error during ZIP file loading: {e}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Step 2: Select Preset
|
|
||||||
self.test_step = "SELECTING_PRESET"
|
|
||||||
logger.info(f"Step 2: Selecting preset: {self.cli_args.preset}") # KEEP INFO - Passes filter
|
|
||||||
preset_found = False
|
|
||||||
preset_list_widget = self.main_window.preset_editor_widget.editor_preset_list
|
|
||||||
for i in range(preset_list_widget.count()):
|
|
||||||
item = preset_list_widget.item(i)
|
|
||||||
if item and item.text() == self.cli_args.preset:
|
|
||||||
preset_list_widget.setCurrentItem(item)
|
|
||||||
logger.debug(f"Preset '{self.cli_args.preset}' selected.")
|
|
||||||
preset_found = True
|
|
||||||
break
|
|
||||||
if not preset_found:
|
|
||||||
logger.error(f"Preset '{self.cli_args.preset}' not found in the list.")
|
|
||||||
available_presets = [preset_list_widget.item(i).text() for i in range(preset_list_widget.count())]
|
|
||||||
logger.debug(f"Available presets: {available_presets}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Step 3: Await Prediction Completion
|
|
||||||
self.test_step = "AWAITING_PREDICTION"
|
|
||||||
logger.debug("Step 3: Awaiting prediction completion...")
|
|
||||||
self.prediction_poll_timer.timeout.connect(self._check_prediction_status)
|
|
||||||
self.prediction_poll_timer.start(500) # Poll every 500ms
|
|
||||||
|
|
||||||
# Use a QTimer to allow event loop to process while waiting for this step
|
|
||||||
# This ensures that the _check_prediction_status can be called.
|
|
||||||
# We will exit this event_loop from _check_prediction_status when prediction is done.
|
|
||||||
logger.debug("Starting event loop for prediction...")
|
|
||||||
self.event_loop.exec() # This loop is quit by _check_prediction_status
|
|
||||||
self.prediction_poll_timer.stop()
|
|
||||||
logger.debug("Event loop for prediction finished.")
|
|
||||||
|
|
||||||
|
|
||||||
if self.test_step != "PREDICTION_COMPLETE":
|
|
||||||
logger.error(f"Prediction did not complete as expected. Current step: {self.test_step}")
|
|
||||||
# Check if there were any pending predictions that never cleared
|
|
||||||
if hasattr(self.main_window, '_pending_predictions'):
|
|
||||||
logger.error(f"Pending predictions at timeout: {self.main_window._pending_predictions}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
logger.info("Prediction completed successfully.") # KEEP INFO - Passes filter
|
|
||||||
|
|
||||||
# Step 4: Retrieve & Compare Rulelist
|
|
||||||
self.test_step = "COMPARING_RULES"
|
|
||||||
logger.info("Step 4: Retrieving and Comparing Rules...") # KEEP INFO - Passes filter
|
|
||||||
actual_source_rules_list: List[SourceRule] = self.main_window.unified_model.get_all_source_rules()
|
|
||||||
actual_rules_obj = actual_source_rules_list # Keep the SourceRule list for processing
|
|
||||||
|
|
||||||
comparable_actual_rules = self._convert_rules_to_comparable(actual_source_rules_list)
|
|
||||||
|
|
||||||
if not self._compare_rules(comparable_actual_rules, self.expected_rules_data):
|
|
||||||
logger.error("Rule comparison failed. See logs for details.")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
logger.info("Rule comparison successful.") # KEEP INFO - Passes filter
|
|
||||||
|
|
||||||
# Step 5: Start Processing
|
|
||||||
self.test_step = "START_PROCESSING"
|
|
||||||
logger.info("Step 5: Starting Processing...") # KEEP INFO - Passes filter
|
|
||||||
processing_settings = {
|
|
||||||
"output_dir": str(self.cli_args.outputdir), # Ensure it's a string for JSON/config
|
|
||||||
"overwrite": True,
|
|
||||||
"workers": 1,
|
|
||||||
"blender_enabled": False # Basic test, no Blender
|
|
||||||
}
|
|
||||||
try:
|
|
||||||
Path(self.cli_args.outputdir).mkdir(parents=True, exist_ok=True)
|
|
||||||
logger.debug(f"Ensured output directory exists: {self.cli_args.outputdir}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Could not create output directory {self.cli_args.outputdir}: {e}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
|
|
||||||
if hasattr(self.main_window, 'start_backend_processing') and isinstance(self.main_window.start_backend_processing, Signal):
|
|
||||||
logger.debug(f"Emitting start_backend_processing with rules count: {len(actual_rules_obj)} and settings: {processing_settings}")
|
|
||||||
self.main_window.start_backend_processing.emit(actual_rules_obj, processing_settings)
|
|
||||||
else:
|
|
||||||
logger.error("'start_backend_processing' signal not found on MainWindow. Cannot start processing.")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Step 6: Await Processing Completion
|
|
||||||
self.test_step = "AWAIT_PROCESSING"
|
|
||||||
logger.debug("Step 6: Awaiting processing completion...")
|
|
||||||
self.event_loop.exec() # This loop is quit by _on_all_tasks_finished
|
|
||||||
|
|
||||||
if self.test_step != "PROCESSING_COMPLETE":
|
|
||||||
logger.error(f"Processing did not complete as expected. Current step: {self.test_step}")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
logger.info("Processing completed.") # KEEP INFO - Passes filter
|
|
||||||
|
|
||||||
# Step 7: Check Output Path
|
|
||||||
self.test_step = "CHECK_OUTPUT"
|
|
||||||
logger.info(f"Step 7: Checking output path: {self.cli_args.outputdir}") # KEEP INFO - Passes filter
|
|
||||||
output_path = Path(self.cli_args.outputdir)
|
|
||||||
if not output_path.exists() or not output_path.is_dir():
|
|
||||||
logger.error(f"Output directory {output_path} does not exist or is not a directory.")
|
|
||||||
self.cleanup_and_exit(success=False)
|
|
||||||
return
|
|
||||||
|
|
||||||
output_items = list(output_path.iterdir())
|
|
||||||
if not output_items:
|
|
||||||
logger.warning(f"Output directory {output_path} is empty. This might be a test failure depending on the case.")
|
|
||||||
# For a more specific check, one might iterate through actual_rules_obj
|
|
||||||
# and verify if subdirectories matching asset_name exist.
|
|
||||||
# e.g. for asset_rule in source_rule.assets:
|
|
||||||
# expected_asset_dir = output_path / asset_rule.asset_name
|
|
||||||
# if not expected_asset_dir.is_dir(): logger.error(...)
|
|
||||||
else:
|
|
||||||
logger.debug(f"Found {len(output_items)} item(s) in output directory:")
|
|
||||||
for item in output_items:
|
|
||||||
logger.debug(f" - {item.name} ({'dir' if item.is_dir() else 'file'})")
|
|
||||||
logger.info("Output path check completed.") # KEEP INFO - Passes filter
|
|
||||||
|
|
||||||
# Step 8: Retrieve & Analyze Logs
|
|
||||||
self.test_step = "CHECK_LOGS"
|
|
||||||
logger.debug("Step 8: Retrieving and Analyzing Logs...")
|
|
||||||
all_logs_text = ""
|
|
||||||
if self.main_window.log_console and self.main_window.log_console.log_console_output:
|
|
||||||
all_logs_text = self.main_window.log_console.log_console_output.toPlainText()
|
|
||||||
else:
|
|
||||||
logger.warning("Log console or output widget not found. Cannot retrieve logs.")
|
|
||||||
|
|
||||||
self._process_and_display_logs(all_logs_text)
|
|
||||||
logger.info("Log analysis completed.")
|
|
||||||
|
|
||||||
# Final Step
|
|
||||||
logger.info("Test run completed successfully.") # KEEP INFO - Passes filter
|
|
||||||
self.cleanup_and_exit(success=True)
|
|
||||||
|
|
||||||
@Slot()
|
|
||||||
def _check_prediction_status(self) -> None:
|
|
||||||
"""Polls the main window for pending predictions."""
|
|
||||||
# logger.debug(f"Checking prediction status. Pending: {self.main_window._pending_predictions if hasattr(self.main_window, '_pending_predictions') else 'N/A'}")
|
|
||||||
if hasattr(self.main_window, '_pending_predictions'):
|
|
||||||
if not self.main_window._pending_predictions: # Assuming _pending_predictions is a list/dict that's empty when done
|
|
||||||
logger.debug("No pending predictions. Prediction assumed complete.")
|
|
||||||
self.test_step = "PREDICTION_COMPLETE"
|
|
||||||
if self.event_loop.isRunning():
|
|
||||||
self.event_loop.quit()
|
|
||||||
# else:
|
|
||||||
# logger.debug(f"Still awaiting predictions: {len(self.main_window._pending_predictions)} remaining.")
|
|
||||||
else:
|
|
||||||
logger.warning("'_pending_predictions' attribute not found on MainWindow. Cannot check prediction status automatically.")
|
|
||||||
# As a fallback, if the attribute is missing, we might assume prediction is instant or needs manual check.
|
|
||||||
# For now, let's assume it means it's done if the attribute is missing, but this is risky.
|
|
||||||
# A better approach would be to have a clear signal from MainWindow when predictions are done.
|
|
||||||
self.test_step = "PREDICTION_COMPLETE" # Risky assumption
|
|
||||||
if self.event_loop.isRunning():
|
|
||||||
self.event_loop.quit()
|
|
||||||
|
|
||||||
|
|
||||||
@Slot(int, int, int)
|
|
||||||
def _on_all_tasks_finished(self, processed_count: int, skipped_count: int, failed_count: int) -> None:
|
|
||||||
"""Slot for App.all_tasks_finished signal."""
|
|
||||||
logger.info(f"Signal 'all_tasks_finished' received: Processed={processed_count}, Skipped={skipped_count}, Failed={failed_count}") # KEEP INFO - Passes filter
|
|
||||||
|
|
||||||
if self.test_step == "AWAIT_PROCESSING":
|
|
||||||
logger.debug("Processing completion signal received.") # Covered by the summary log above
|
|
||||||
if failed_count > 0:
|
|
||||||
logger.error(f"Processing finished with {failed_count} failed task(s).")
|
|
||||||
# Even if tasks failed, the test might pass based on output checks.
|
|
||||||
# The error is logged for information.
|
|
||||||
self.test_step = "PROCESSING_COMPLETE"
|
|
||||||
if self.event_loop.isRunning():
|
|
||||||
self.event_loop.quit()
|
|
||||||
else:
|
|
||||||
logger.warning(f"Signal 'all_tasks_finished' received at an unexpected test step: '{self.test_step}'. Counts: P={processed_count}, S={skipped_count}, F={failed_count}")
|
|
||||||
|
|
||||||
|
|
||||||
def _convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> Dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Converts a list of SourceRule objects to a dictionary structure
|
|
||||||
suitable for comparison with the expected_rules.json.
|
|
||||||
"""
|
|
||||||
logger.debug(f"Converting {len(source_rules_list)} SourceRule objects to comparable dictionary...")
|
|
||||||
comparable_sources_list = []
|
|
||||||
for source_rule_obj in source_rules_list:
|
|
||||||
comparable_asset_list = []
|
|
||||||
# source_rule_obj.assets is List[AssetRule]
|
|
||||||
for asset_rule_obj in source_rule_obj.assets:
|
|
||||||
comparable_file_list = []
|
|
||||||
# asset_rule_obj.files is List[FileRule]
|
|
||||||
for file_rule_obj in asset_rule_obj.files:
|
|
||||||
comparable_file_list.append({
|
|
||||||
"file_path": file_rule_obj.file_path,
|
|
||||||
"item_type": file_rule_obj.item_type,
|
|
||||||
"target_asset_name_override": file_rule_obj.target_asset_name_override
|
|
||||||
})
|
|
||||||
comparable_asset_list.append({
|
|
||||||
"asset_name": asset_rule_obj.asset_name,
|
|
||||||
"asset_type": asset_rule_obj.asset_type,
|
|
||||||
"files": comparable_file_list
|
|
||||||
})
|
|
||||||
comparable_sources_list.append({
|
|
||||||
"input_path": Path(source_rule_obj.input_path).name, # Use only the filename
|
|
||||||
"supplier_identifier": source_rule_obj.supplier_identifier,
|
|
||||||
"preset_name": source_rule_obj.preset_name,
|
|
||||||
"assets": comparable_asset_list
|
|
||||||
})
|
|
||||||
logger.debug("Conversion to comparable dictionary finished.")
|
|
||||||
return {"source_rules": comparable_sources_list}
|
|
||||||
|
|
||||||
def _compare_rule_item(self, actual_item: Dict[str, Any], expected_item: Dict[str, Any], item_type_name: str, parent_context: str = "") -> bool:
|
|
||||||
"""
|
|
||||||
Recursively compares an individual actual rule item dictionary with an expected rule item dictionary.
|
|
||||||
Logs differences and returns True if they match, False otherwise.
|
|
||||||
"""
|
|
||||||
item_match = True
|
|
||||||
|
|
||||||
identifier = ""
|
|
||||||
if item_type_name == "SourceRule":
|
|
||||||
identifier = expected_item.get('input_path', f'UnknownSource_at_{parent_context}')
|
|
||||||
elif item_type_name == "AssetRule":
|
|
||||||
identifier = expected_item.get('asset_name', f'UnknownAsset_at_{parent_context}')
|
|
||||||
elif item_type_name == "FileRule":
|
|
||||||
identifier = expected_item.get('file_path', f'UnknownFile_at_{parent_context}')
|
|
||||||
|
|
||||||
current_context = f"{parent_context}/{identifier}" if parent_context else identifier
|
|
||||||
|
|
||||||
# Log Extra Fields: Iterate through keys in actual_item.
|
|
||||||
# If a key is in actual_item but not in expected_item (and is not a list container like "assets" or "files"),
|
|
||||||
# log this as an informational message.
|
|
||||||
for key in actual_item.keys():
|
|
||||||
if key not in expected_item and key not in ["assets", "files"]:
|
|
||||||
logger.debug(f"Field '{key}' present in actual {item_type_name} ({current_context}) but not specified in expected. Value: '{actual_item[key]}'")
|
|
||||||
|
|
||||||
# Check Expected Fields: Iterate through keys in expected_item.
|
|
||||||
for key, expected_value in expected_item.items():
|
|
||||||
if key not in actual_item:
|
|
||||||
logger.error(f"Missing expected field '{key}' in actual {item_type_name} ({current_context}).")
|
|
||||||
item_match = False
|
|
||||||
continue # Continue to check other fields in the expected_item
|
|
||||||
|
|
||||||
actual_value = actual_item[key]
|
|
||||||
|
|
||||||
if key == "assets": # List of AssetRule dictionaries
|
|
||||||
if not self._compare_list_of_rules(actual_value, expected_value, "AssetRule", current_context, "asset_name"):
|
|
||||||
item_match = False
|
|
||||||
elif key == "files": # List of FileRule dictionaries
|
|
||||||
if not self._compare_list_of_rules(actual_value, expected_value, "FileRule", current_context, "file_path"):
|
|
||||||
item_match = False
|
|
||||||
else: # Regular field comparison
|
|
||||||
if actual_value != expected_value:
|
|
||||||
# Handle None vs "None" string for preset_name specifically if it's a common issue
|
|
||||||
if key == "preset_name" and actual_value is None and expected_value == "None":
|
|
||||||
logger.debug(f"Field '{key}' in {item_type_name} ({current_context}): Actual is None, Expected is string \"None\". Treating as match for now.")
|
|
||||||
elif key == "target_asset_name_override" and actual_value is not None and expected_value is None:
|
|
||||||
# If actual has a value (e.g. parent asset name) and expected is null/None,
|
|
||||||
# this is a mismatch according to strict comparison.
|
|
||||||
# For a more lenient check, this logic could be adjusted here.
|
|
||||||
# Current strict comparison will flag this as error, which is what the logs show.
|
|
||||||
logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.")
|
|
||||||
item_match = False
|
|
||||||
else:
|
|
||||||
logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.")
|
|
||||||
item_match = False
|
|
||||||
|
|
||||||
return item_match
|
|
||||||
|
|
||||||
def _compare_list_of_rules(self, actual_list: List[Dict[str, Any]], expected_list: List[Dict[str, Any]], item_type_name: str, parent_context: str, item_key_field: str) -> bool:
|
|
||||||
"""
|
|
||||||
Compares a list of actual rule items against a list of expected rule items.
|
|
||||||
Items are matched by a key field (e.g., 'asset_name' or 'file_path').
|
|
||||||
Order independent for matching, but logs count mismatches.
|
|
||||||
"""
|
|
||||||
list_match = True # Corrected indentation
|
|
||||||
if not isinstance(actual_list, list) or not isinstance(expected_list, list):
|
|
||||||
logger.error(f"Type mismatch for list of {item_type_name}s in {parent_context}. Expected lists.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
if len(actual_list) != len(expected_list):
|
|
||||||
logger.error(f"Mismatch in number of {item_type_name}s for {parent_context}. Actual: {len(actual_list)}, Expected: {len(expected_list)}.")
|
|
||||||
list_match = False # Count mismatch is an error
|
|
||||||
# If counts differ, we still try to match what we can to provide more detailed feedback,
|
|
||||||
# but the overall list_match will remain False.
|
|
||||||
|
|
||||||
actual_items_map = {item.get(item_key_field): item for item in actual_list if item.get(item_key_field) is not None}
|
|
||||||
|
|
||||||
# Keep track of expected items that found a match to identify missing ones more easily
|
|
||||||
matched_expected_keys = set()
|
|
||||||
|
|
||||||
for expected_item in expected_list:
|
|
||||||
expected_key_value = expected_item.get(item_key_field)
|
|
||||||
if expected_key_value is None:
|
|
||||||
logger.error(f"Expected {item_type_name} in {parent_context} is missing key field '{item_key_field}'. Cannot compare this item: {expected_item}")
|
|
||||||
list_match = False # This specific expected item cannot be processed
|
|
||||||
continue
|
|
||||||
|
|
||||||
actual_item = actual_items_map.get(expected_key_value)
|
|
||||||
if actual_item:
|
|
||||||
matched_expected_keys.add(expected_key_value)
|
|
||||||
if not self._compare_rule_item(actual_item, expected_item, item_type_name, parent_context):
|
|
||||||
list_match = False # Individual item comparison failed
|
|
||||||
else:
|
|
||||||
logger.error(f"Expected {item_type_name} with {item_key_field} '{expected_key_value}' not found in actual items for {parent_context}.")
|
|
||||||
list_match = False
|
|
||||||
|
|
||||||
# Identify actual items that were not matched by any expected item
|
|
||||||
# This is useful if len(actual_list) >= len(expected_list) but some actual items are "extra"
|
|
||||||
for actual_key_value, actual_item_data in actual_items_map.items():
|
|
||||||
if actual_key_value not in matched_expected_keys:
|
|
||||||
logger.debug(f"Extra actual {item_type_name} with {item_key_field} '{actual_key_value}' found in {parent_context} (not in expected list or already matched).")
|
|
||||||
if len(actual_list) != len(expected_list): # If counts already flagged a mismatch, this is just detail
|
|
||||||
pass
|
|
||||||
else: # Counts matched, but content didn't align perfectly by key
|
|
||||||
list_match = False
|
|
||||||
|
|
||||||
|
|
||||||
return list_match # Corrected indentation
|
|
||||||
|
|
||||||
def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool: # Corrected structure: moved out
|
|
||||||
item_match = False
|
|
||||||
|
|
||||||
return item_match
|
|
||||||
|
|
||||||
def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool:
|
|
||||||
"""
|
|
||||||
Compares the actual rule data (converted from live SourceRule objects)
|
|
||||||
with the expected rule data (loaded from JSON).
|
|
||||||
"""
|
|
||||||
logger.debug("Comparing actual rules with expected rules...")
|
|
||||||
|
|
||||||
actual_source_rules = actual_rules_data.get("source_rules", []) if actual_rules_data else []
|
|
||||||
expected_source_rules = expected_rules_data.get("source_rules", []) if expected_rules_data else []
|
|
||||||
|
|
||||||
if not isinstance(actual_source_rules, list):
|
|
||||||
logger.error(f"Actual 'source_rules' is not a list. Found type: {type(actual_source_rules)}. Comparison aborted.")
|
|
||||||
return False # Cannot compare if actual data is malformed
|
|
||||||
if not isinstance(expected_source_rules, list):
|
|
||||||
logger.error(f"Expected 'source_rules' is not a list. Found type: {type(expected_source_rules)}. Test configuration error. Comparison aborted.")
|
|
||||||
return False # Test setup error
|
|
||||||
|
|
||||||
if not expected_source_rules and not actual_source_rules:
|
|
||||||
logger.debug("Both expected and actual source rules lists are empty. Considered a match.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
if len(actual_source_rules) != len(expected_source_rules):
|
|
||||||
logger.error(f"Mismatch in the number of source rules. Actual: {len(actual_source_rules)}, Expected: {len(expected_source_rules)}.")
|
|
||||||
# Optionally, log more details about which list is longer/shorter or identifiers if available
|
|
||||||
return False
|
|
||||||
|
|
||||||
overall_match_status = True
|
|
||||||
for i in range(len(expected_source_rules)):
|
|
||||||
actual_sr = actual_source_rules[i]
|
|
||||||
expected_sr = expected_source_rules[i]
|
|
||||||
|
|
||||||
# For context, use input_path or an index
|
|
||||||
source_rule_context = expected_sr.get('input_path', f"SourceRule_index_{i}")
|
|
||||||
|
|
||||||
if not self._compare_rule_item(actual_sr, expected_sr, "SourceRule", parent_context=source_rule_context):
|
|
||||||
overall_match_status = False
|
|
||||||
# Continue checking other source rules to log all discrepancies
|
|
||||||
|
|
||||||
if overall_match_status:
|
|
||||||
logger.debug("All rules match the expected criteria.") # Covered by "Rule comparison successful" summary
|
|
||||||
else:
|
|
||||||
logger.warning("One or more rules did not match the expected criteria. See logs above for details.")
|
|
||||||
|
|
||||||
return overall_match_status
|
|
||||||
|
|
||||||
def _process_and_display_logs(self, logs_text: str) -> None: # logs_text is no longer the primary source for search
|
|
||||||
"""
|
|
||||||
Processes and displays logs, potentially filtering them if --search is used.
|
|
||||||
Also checks for tracebacks.
|
|
||||||
Sources logs from the in-memory handler for search and detailed analysis.
|
|
||||||
"""
|
|
||||||
logger.debug("--- Log Analysis ---")
|
|
||||||
global autotest_memory_handler # Access the global handler
|
|
||||||
log_records = []
|
|
||||||
if autotest_memory_handler and autotest_memory_handler.buffer:
|
|
||||||
log_records = autotest_memory_handler.buffer
|
|
||||||
|
|
||||||
formatted_log_lines = []
|
|
||||||
# Define a consistent formatter, similar to what might be expected or useful for search
|
|
||||||
record_formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
|
|
||||||
# Default asctime format includes milliseconds.
|
|
||||||
|
|
||||||
|
|
||||||
for record in log_records:
|
|
||||||
formatted_log_lines.append(record_formatter.format(record))
|
|
||||||
|
|
||||||
lines_for_search_and_traceback = formatted_log_lines
|
|
||||||
|
|
||||||
if not lines_for_search_and_traceback:
|
|
||||||
logger.warning("No log records found in memory handler. No analysis to perform.")
|
|
||||||
# Still check the console logs_text for tracebacks if it exists, as a fallback
|
|
||||||
# or if some critical errors didn't make it to the memory handler (unlikely with DEBUG level)
|
|
||||||
if logs_text:
|
|
||||||
logger.debug("Checking provided logs_text (from console) for tracebacks as a fallback.")
|
|
||||||
console_lines = logs_text.splitlines()
|
|
||||||
traceback_found_console = False
|
|
||||||
for i, line in enumerate(console_lines):
|
|
||||||
if line.strip().startswith("Traceback (most recent call last):"):
|
|
||||||
logger.error(f"!!! TRACEBACK DETECTED in console logs_text around line {i+1} !!!")
|
|
||||||
traceback_found_console = True
|
|
||||||
if traceback_found_console:
|
|
||||||
logger.warning("A traceback was found in the console logs_text.")
|
|
||||||
else:
|
|
||||||
logger.info("No tracebacks found in the console logs_text either.")
|
|
||||||
logger.info("--- End Log Analysis ---")
|
|
||||||
return
|
|
||||||
|
|
||||||
traceback_found = False
|
|
||||||
|
|
||||||
if self.cli_args.search:
|
|
||||||
logger.info(f"Searching {len(lines_for_search_and_traceback)} in-memory log lines for term '{self.cli_args.search}' with {self.cli_args.additional_lines} context lines.")
|
|
||||||
matched_line_indices = [i for i, line in enumerate(lines_for_search_and_traceback) if self.cli_args.search in line]
|
|
||||||
|
|
||||||
if not matched_line_indices:
|
|
||||||
logger.info(f"Search term '{self.cli_args.search}' not found in in-memory logs.")
|
|
||||||
else:
|
|
||||||
logger.info(f"Found {len(matched_line_indices)} match(es) for '{self.cli_args.search}' in in-memory logs:")
|
|
||||||
collected_lines_to_print = set()
|
|
||||||
for match_idx in matched_line_indices:
|
|
||||||
start_idx = max(0, match_idx - self.cli_args.additional_lines)
|
|
||||||
end_idx = min(len(lines_for_search_and_traceback), match_idx + self.cli_args.additional_lines + 1)
|
|
||||||
for i in range(start_idx, end_idx):
|
|
||||||
# Use i directly as index for lines_for_search_and_traceback, line number is for display
|
|
||||||
collected_lines_to_print.add(f"L{i+1:05d}: {lines_for_search_and_traceback[i]}")
|
|
||||||
|
|
||||||
print("--- Filtered Log Output (from Memory Handler) ---")
|
|
||||||
for line_to_print in sorted(list(collected_lines_to_print)):
|
|
||||||
print(line_to_print)
|
|
||||||
print("--- End Filtered Log Output ---")
|
|
||||||
# Removed: else block that showed last N lines by default (as per original instruction for this section)
|
|
||||||
|
|
||||||
# Traceback Check (on lines_for_search_and_traceback)
|
|
||||||
for i, line in enumerate(lines_for_search_and_traceback):
|
|
||||||
if line.strip().startswith("Traceback (most recent call last):") or "Traceback (most recent call last):" in line : # More robust check
|
|
||||||
logger.error(f"!!! TRACEBACK DETECTED in in-memory logs around line index {i} !!!")
|
|
||||||
logger.error(f"Line content: {line}")
|
|
||||||
traceback_found = True
|
|
||||||
|
|
||||||
if traceback_found:
|
|
||||||
logger.warning("A traceback was found in the in-memory logs. This usually indicates a significant issue.")
|
|
||||||
else:
|
|
||||||
logger.info("No tracebacks found in the in-memory logs.") # This refers to the comprehensive memory logs
|
|
||||||
|
|
||||||
logger.info("--- End Log Analysis ---")
|
|
||||||
|
|
||||||
def cleanup_and_exit(self, success: bool = True) -> None:
|
|
||||||
"""Cleans up and exits the application."""
|
|
||||||
global autotest_memory_handler
|
|
||||||
if autotest_memory_handler:
|
|
||||||
logger.debug("Clearing memory log handler buffer and removing handler.")
|
|
||||||
autotest_memory_handler.buffer = [] # Clear buffer
|
|
||||||
logging.getLogger().removeHandler(autotest_memory_handler) # Remove handler
|
|
||||||
autotest_memory_handler.close() # MemoryHandler close is a no-op but good practice
|
|
||||||
autotest_memory_handler = None
|
|
||||||
|
|
||||||
logger.info(f"Test {'succeeded' if success else 'failed'}. Cleaning up and exiting...") # KEEP INFO - Passes filter
|
|
||||||
q_app = QCoreApplication.instance()
|
|
||||||
if q_app:
|
|
||||||
q_app.quit()
|
|
||||||
sys.exit(0 if success else 1)
|
|
||||||
|
|
||||||
# --- Main Execution ---
|
|
||||||
def main():
|
|
||||||
"""Main function to run the autotest script."""
|
|
||||||
cli_args = parse_arguments()
|
|
||||||
# Logger is configured above, this will now use the new filtered setup
|
|
||||||
logger.info(f"Parsed CLI arguments: {cli_args}") # KEEP INFO - Passes filter
|
|
||||||
|
|
||||||
# Clean and ensure output directory exists
|
|
||||||
output_dir_path = Path(cli_args.outputdir)
|
|
||||||
logger.debug(f"Preparing output directory: {output_dir_path}")
|
|
||||||
try:
|
|
||||||
if output_dir_path.exists():
|
|
||||||
logger.debug(f"Output directory {output_dir_path} exists. Cleaning its contents...")
|
|
||||||
for item in output_dir_path.iterdir():
|
|
||||||
if item.is_dir():
|
|
||||||
shutil.rmtree(item)
|
|
||||||
logger.debug(f"Removed directory: {item}")
|
|
||||||
else:
|
|
||||||
item.unlink()
|
|
||||||
logger.debug(f"Removed file: {item}")
|
|
||||||
logger.debug(f"Contents of {output_dir_path} cleaned.")
|
|
||||||
else:
|
|
||||||
logger.debug(f"Output directory {output_dir_path} does not exist. Creating it.")
|
|
||||||
|
|
||||||
output_dir_path.mkdir(parents=True, exist_ok=True) # Ensure it exists after cleaning/if it didn't exist
|
|
||||||
logger.debug(f"Output directory {output_dir_path} is ready.")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Could not prepare output directory {output_dir_path}: {e}", exc_info=True)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Initialize QApplication
|
|
||||||
# Use QCoreApplication if no GUI elements are directly interacted with by the test logic itself,
|
|
||||||
# but QApplication is needed if MainWindow or its widgets are constructed and used.
|
|
||||||
# Since MainWindow is instantiated by App, QApplication is appropriate.
|
|
||||||
q_app = QApplication.instance()
|
|
||||||
if not q_app:
|
|
||||||
q_app = QApplication(sys.argv)
|
|
||||||
if not q_app: # Still no app
|
|
||||||
logger.error("Failed to initialize QApplication.")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
logger.debug("Initializing main.App()...")
|
|
||||||
try:
|
|
||||||
# Instantiate main.App() - this should create MainWindow but not show it by default
|
|
||||||
# if App is designed to not show GUI unless app.main_window.show() is called.
|
|
||||||
app_instance = App()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to initialize main.App: {e}", exc_info=True)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if not app_instance.main_window:
|
|
||||||
logger.error("main.App initialized, but main_window is None. Cannot proceed with test.")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
logger.debug("Initializing AutoTester...")
|
|
||||||
try:
|
|
||||||
tester = AutoTester(app_instance, cli_args)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to initialize AutoTester: {e}", exc_info=True)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Use QTimer.singleShot to start the test after the Qt event loop has started.
|
|
||||||
# This ensures that the Qt environment is fully set up.
|
|
||||||
logger.debug("Scheduling test run...")
|
|
||||||
QTimer.singleShot(0, tester.run_test)
|
|
||||||
|
|
||||||
logger.debug("Starting Qt application event loop...")
|
|
||||||
exit_code = q_app.exec()
|
|
||||||
logger.debug(f"Qt application event loop finished with exit code: {exit_code}")
|
|
||||||
sys.exit(exit_code)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
13
main.py
13
main.py
@ -401,13 +401,11 @@ class App(QObject):
|
|||||||
|
|
||||||
# --- Get paths needed for ProcessingTask ---
|
# --- Get paths needed for ProcessingTask ---
|
||||||
try:
|
try:
|
||||||
# Get output_dir from processing_settings passed from autotest.py
|
# Access output path via MainPanelWidget
|
||||||
output_base_path_str = processing_settings.get("output_dir")
|
output_base_path_str = self.main_window.main_panel_widget.output_path_edit.text().strip()
|
||||||
log.info(f"APP_DEBUG: Received output_dir in processing_settings: {output_base_path_str}")
|
|
||||||
|
|
||||||
if not output_base_path_str:
|
if not output_base_path_str:
|
||||||
log.error("Cannot queue tasks: Output directory path is empty in processing_settings.")
|
log.error("Cannot queue tasks: Output directory path is empty in the GUI.")
|
||||||
# self.main_window.statusBar().showMessage("Error: Output directory cannot be empty.", 5000) # GUI specific
|
self.main_window.statusBar().showMessage("Error: Output directory cannot be empty.", 5000)
|
||||||
return
|
return
|
||||||
output_base_path = Path(output_base_path_str)
|
output_base_path = Path(output_base_path_str)
|
||||||
# Basic validation - check if it's likely a valid path structure (doesn't guarantee existence/writability here)
|
# Basic validation - check if it's likely a valid path structure (doesn't guarantee existence/writability here)
|
||||||
@ -479,9 +477,8 @@ class App(QObject):
|
|||||||
engine=task_engine,
|
engine=task_engine,
|
||||||
rule=rule,
|
rule=rule,
|
||||||
workspace_path=workspace_path,
|
workspace_path=workspace_path,
|
||||||
output_base_path=output_base_path # This is Path(output_base_path_str)
|
output_base_path=output_base_path
|
||||||
)
|
)
|
||||||
log.info(f"APP_DEBUG: Passing to ProcessingTask: output_base_path = {output_base_path}")
|
|
||||||
task.signals.finished.connect(self._on_task_finished)
|
task.signals.finished.connect(self._on_task_finished)
|
||||||
log.debug(f"DEBUG: Calling thread_pool.start() for task {i+1}")
|
log.debug(f"DEBUG: Calling thread_pool.start() for task {i+1}")
|
||||||
self.thread_pool.start(task)
|
self.thread_pool.start(task)
|
||||||
|
|||||||
@ -85,7 +85,6 @@ class MetadataInitializationStage(ProcessingStage):
|
|||||||
merged_maps_details.
|
merged_maps_details.
|
||||||
"""
|
"""
|
||||||
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
|
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
|
||||||
logger.debug(f"METADATA_INIT_DEBUG: Entry - context.output_base_path = {context.output_base_path}") # Added
|
|
||||||
"""
|
"""
|
||||||
Executes the metadata initialization logic.
|
Executes the metadata initialization logic.
|
||||||
|
|
||||||
@ -171,5 +170,4 @@ class MetadataInitializationStage(ProcessingStage):
|
|||||||
# Example of how you might log the full metadata for debugging:
|
# Example of how you might log the full metadata for debugging:
|
||||||
# logger.debug(f"Initialized metadata: {context.asset_metadata}")
|
# logger.debug(f"Initialized metadata: {context.asset_metadata}")
|
||||||
|
|
||||||
logger.debug(f"METADATA_INIT_DEBUG: Exit - context.output_base_path = {context.output_base_path}") # Added
|
|
||||||
return context
|
return context
|
||||||
@ -17,16 +17,8 @@ class OutputOrganizationStage(ProcessingStage):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
|
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
|
||||||
asset_name_for_log_early = context.asset_rule.asset_name if hasattr(context, 'asset_rule') and context.asset_rule else "Unknown Asset (early)"
|
log.info("OUTPUT_ORG: Stage execution started for asset '%s'", context.asset_rule.asset_name)
|
||||||
log.info(f"OUTPUT_ORG_DEBUG: Stage execution started for asset '{asset_name_for_log_early}'.")
|
log.info(f"OUTPUT_ORG: context.processed_maps_details at start: {context.processed_maps_details}")
|
||||||
logger.debug(f"OUTPUT_ORG_DEBUG: Entry - context.output_base_path = {context.output_base_path}") # Modified
|
|
||||||
log.info(f"OUTPUT_ORG_DEBUG: Received context.config_obj.output_directory_base (raw from config) = {getattr(context.config_obj, 'output_directory_base', 'N/A')}")
|
|
||||||
# resolved_base = "N/A"
|
|
||||||
# if hasattr(context.config_obj, '_settings') and context.config_obj._settings.get('OUTPUT_BASE_DIR'):
|
|
||||||
# base_dir_from_settings = context.config_obj._settings.get('OUTPUT_BASE_DIR')
|
|
||||||
# Path resolution logic might be complex
|
|
||||||
# log.info(f"OUTPUT_ORG_DEBUG: Received context.config_obj._settings.OUTPUT_BASE_DIR (resolved guess) = {resolved_base}")
|
|
||||||
log.info(f"OUTPUT_ORG_DEBUG: context.processed_maps_details at start: {context.processed_maps_details}")
|
|
||||||
"""
|
"""
|
||||||
Copies temporary processed and merged files to their final output locations
|
Copies temporary processed and merged files to their final output locations
|
||||||
based on path patterns and updates AssetProcessingContext.
|
based on path patterns and updates AssetProcessingContext.
|
||||||
@ -111,9 +103,7 @@ class OutputOrganizationStage(ProcessingStage):
|
|||||||
pattern_string=output_dir_pattern,
|
pattern_string=output_dir_pattern,
|
||||||
token_data=token_data_variant_cleaned
|
token_data=token_data_variant_cleaned
|
||||||
)
|
)
|
||||||
logger.debug(f"OUTPUT_ORG_DEBUG: Variants - Using context.output_base_path = {context.output_base_path} for final_variant_path construction.") # Added
|
|
||||||
final_variant_path = Path(context.output_base_path) / Path(relative_dir_path_str_variant) / Path(output_filename_variant)
|
final_variant_path = Path(context.output_base_path) / Path(relative_dir_path_str_variant) / Path(output_filename_variant)
|
||||||
logger.debug(f"OUTPUT_ORG_DEBUG: Variants - Constructed final_variant_path = {final_variant_path}") # Added
|
|
||||||
final_variant_path.parent.mkdir(parents=True, exist_ok=True)
|
final_variant_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
if final_variant_path.exists() and not overwrite_existing:
|
if final_variant_path.exists() and not overwrite_existing:
|
||||||
@ -179,9 +169,7 @@ class OutputOrganizationStage(ProcessingStage):
|
|||||||
pattern_string=output_dir_pattern,
|
pattern_string=output_dir_pattern,
|
||||||
token_data=token_data_cleaned
|
token_data=token_data_cleaned
|
||||||
)
|
)
|
||||||
logger.debug(f"OUTPUT_ORG_DEBUG: SingleFile - Using context.output_base_path = {context.output_base_path} for final_path construction.") # Added
|
|
||||||
final_path = Path(context.output_base_path) / Path(relative_dir_path_str) / Path(output_filename)
|
final_path = Path(context.output_base_path) / Path(relative_dir_path_str) / Path(output_filename)
|
||||||
logger.debug(f"OUTPUT_ORG_DEBUG: SingleFile - Constructed final_path = {final_path}") # Added
|
|
||||||
final_path.parent.mkdir(parents=True, exist_ok=True)
|
final_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
if final_path.exists() and not overwrite_existing:
|
if final_path.exists() and not overwrite_existing:
|
||||||
@ -257,12 +245,10 @@ class OutputOrganizationStage(ProcessingStage):
|
|||||||
token_data=base_token_data_cleaned
|
token_data=base_token_data_cleaned
|
||||||
)
|
)
|
||||||
# Destination: <output_base_path>/<asset_base_output_dir_str>/<extra_subdir_name>/<original_filename>
|
# Destination: <output_base_path>/<asset_base_output_dir_str>/<extra_subdir_name>/<original_filename>
|
||||||
logger.debug(f"OUTPUT_ORG_DEBUG: ExtraFiles - Using context.output_base_path = {context.output_base_path} for final_dest_path construction.") # Added
|
|
||||||
final_dest_path = (Path(context.output_base_path) /
|
final_dest_path = (Path(context.output_base_path) /
|
||||||
Path(asset_base_output_dir_str) /
|
Path(asset_base_output_dir_str) /
|
||||||
Path(extra_subdir_name) /
|
Path(extra_subdir_name) /
|
||||||
source_file_path.name) # Use original filename
|
source_file_path.name) # Use original filename
|
||||||
logger.debug(f"OUTPUT_ORG_DEBUG: ExtraFiles - Constructed final_dest_path = {final_dest_path}") # Added
|
|
||||||
|
|
||||||
final_dest_path.parent.mkdir(parents=True, exist_ok=True)
|
final_dest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user