8 Commits

38 changed files with 1847 additions and 264 deletions

4
.gitignore vendored
View File

@@ -30,6 +30,6 @@ Thumbs.db
gui/__pycache__ gui/__pycache__
__pycache__ __pycache__
Testfiles
Testfiles/ Testfiles/TestOutputs
Testfiles_ Testfiles_

45
.roo/mcp.json Normal file
View File

@@ -0,0 +1,45 @@
{
"mcpServers": {
"conport": {
"command": "C:\\Users\\theis\\context-portal\\.venv\\Scripts\\python.exe",
"args": [
"C:\\Users\\theis\\context-portal\\src\\context_portal_mcp\\main.py",
"--mode",
"stdio",
"--workspace_id",
"${workspaceFolder}"
],
"alwaysAllow": [
"get_product_context",
"update_product_context",
"get_active_context",
"update_active_context",
"log_decision",
"get_decisions",
"search_decisions_fts",
"log_progress",
"get_progress",
"update_progress",
"delete_progress_by_id",
"log_system_pattern",
"get_system_patterns",
"log_custom_data",
"get_custom_data",
"delete_custom_data",
"search_project_glossary_fts",
"export_conport_to_markdown",
"import_markdown_to_conport",
"link_conport_items",
"search_custom_data_value_fts",
"get_linked_items",
"batch_log_items",
"get_item_history",
"delete_decision_by_id",
"delete_system_pattern_by_id",
"get_conport_schema",
"get_recent_activity_summary",
"semantic_search_conport"
]
}
}
}

3
.roomodes Normal file
View File

@@ -0,0 +1,3 @@
{
"customModes": []
}

112
AUTOTEST_GUI_PLAN.md Normal file
View File

@@ -0,0 +1,112 @@
# Plan for Autotest GUI Mode Implementation
**I. Objective:**
Create an `autotest.py` script that can launch the Asset Processor GUI headlessly, load a predefined asset (`.zip`), select a predefined preset, verify the predicted rule structure against an expected JSON, trigger processing to a predefined output directory, check the output, and analyze logs for errors or specific messages. This serves as a sanity check for core GUI-driven workflows.
**II. `TestFiles` Directory:**
A new directory named `TestFiles` will be created in the project root (`c:/Users/Theis/Assetprocessor/Asset-Frameworker/TestFiles/`). This directory will house:
* Sample asset `.zip` files for testing (e.g., `TestFiles/SampleAsset1.zip`).
* Expected rule structure JSON files (e.g., `TestFiles/SampleAsset1_PresetX_expected_rules.json`).
* A subdirectory for test outputs (e.g., `TestFiles/TestOutputs/`).
**III. `autotest.py` Script:**
1. **Location:** `c:/Users/Theis/Assetprocessor/Asset-Frameworker/autotest.py` (or `scripts/autotest.py`).
2. **Command-Line Arguments (with defaults pointing to `TestFiles/`):**
* `--zipfile`: Path to the test asset. Default: `TestFiles/default_test_asset.zip`.
* `--preset`: Name of the preset. Default: `DefaultTestPreset`.
* `--expectedrules`: Path to expected rules JSON. Default: `TestFiles/default_test_asset_rules.json`.
* `--outputdir`: Path for processing output. Default: `TestFiles/TestOutputs/DefaultTestOutput`.
* `--search` (optional): Log search term. Default: `None`.
* `--additional-lines` (optional): Context lines for log search. Default: `0`.
3. **Core Structure:**
* Imports necessary modules from the main application and PySide6.
* Adds project root to `sys.path` for imports.
* `AutoTester` class:
* **`__init__(self, app_instance: App)`:**
* Stores `app_instance` and `main_window`.
* Initializes `QEventLoop`.
* Connects `app_instance.all_tasks_finished` to `self._on_all_tasks_finished`.
* Loads expected rules from the `--expectedrules` file.
* **`run_test(self)`:** Orchestrates the test steps sequentially:
1. Load ZIP (`main_window.add_input_paths()`).
2. Select Preset (`main_window.preset_editor_widget.editor_preset_list.setCurrentItem()`).
3. Await Prediction (using `QTimer` to poll `main_window._pending_predictions`, manage with `QEventLoop`).
4. Retrieve & Compare Rulelist:
* Get actual rules: `main_window.unified_model.get_all_source_rules()`.
* Convert actual rules to comparable dict (`_convert_rules_to_comparable()`).
* Compare with loaded expected rules (`_compare_rules()`). If mismatch, log and fail.
5. Start Processing (emit `main_window.start_backend_processing` with rules and output settings).
6. Await Processing (use `QEventLoop` waiting for `_on_all_tasks_finished`).
7. Check Output Path (verify existence of output dir, list contents, basic sanity checks like non-emptiness or presence of key asset folders).
8. Retrieve & Analyze Logs (`main_window.log_console.log_console_output.toPlainText()`, filter by `--search`, check for tracebacks).
9. Report result and call `cleanup_and_exit()`.
* **`_check_prediction_status(self)`:** Slot for prediction polling timer.
* **`_on_all_tasks_finished(self, processed_count, skipped_count, failed_count)`:** Slot for `App.all_tasks_finished` signal.
* **`_convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> dict`:** Converts `SourceRule` objects to the JSON structure defined below.
* **`_compare_rules(self, actual_rules_data: dict, expected_rules_data: dict) -> bool`:** Implements Option 1 comparison logic:
* Errors if an expected field is missing or its value mismatches.
* Logs (but doesn't error on) fields present in actual but not in expected.
* **`_process_and_display_logs(self, logs_text: str)`:** Handles log filtering/display.
* **`cleanup_and_exit(self, success=True)`:** Quits `QCoreApplication` and `sys.exit()`.
* `main()` function:
* Parses CLI arguments.
* Initializes `QApplication`.
* Instantiates `main.App()` (does *not* show the GUI).
* Instantiates `AutoTester(app_instance)`.
* Uses `QTimer.singleShot(0, tester.run_test)` to start the test.
* Runs `q_app.exec()`.
**IV. `expected_rules.json` Structure (Revised):**
Located in `TestFiles/`. Example: `TestFiles/SampleAsset1_PresetX_expected_rules.json`.
```json
{
"source_rules": [
{
"input_path": "SampleAsset1.zip",
"supplier_identifier": "ExpectedSupplier",
"preset_name": "PresetX",
"assets": [
{
"asset_name": "AssetNameFromPrediction",
"asset_type": "Prop",
"files": [
{
"file_path": "relative/path/to/file1.png",
"item_type": "MAP_COL",
"target_asset_name_override": null
}
]
}
]
}
]
}
```
**V. Mermaid Diagram of Autotest Flow:**
```mermaid
graph TD
A[Start autotest.py with CLI Args (defaults to TestFiles/)] --> B{Setup Args & Logging};
B --> C[Init QApplication & main.App (GUI Headless)];
C --> D[Instantiate AutoTester(app_instance)];
D --> E[QTimer.singleShot -> AutoTester.run_test()];
subgraph AutoTester.run_test()
E --> F[Load Expected Rules from --expectedrules JSON];
F --> G[Load ZIP (--zipfile) via main_window.add_input_paths()];
G --> H[Select Preset (--preset) via main_window.preset_editor_widget];
H --> I[Await Prediction (Poll main_window._pending_predictions via QTimer & QEventLoop)];
I -- Prediction Done --> J[Get Actual Rules from main_window.unified_model];
J --> K[Convert Actual Rules to Comparable JSON Structure];
K --> L{Compare Actual vs Expected Rules (Option 1 Logic)};
L -- Match --> M[Start Processing (Emit main_window.start_backend_processing with --outputdir)];
L -- Mismatch --> ZFAIL[Log Mismatch & Call cleanup_and_exit(False)];
M --> N[Await Processing (QEventLoop for App.all_tasks_finished signal)];
N -- Processing Done --> O[Check Output Dir (--outputdir): Exists? Not Empty? Key Asset Folders?];
O --> P[Retrieve & Analyze Logs (Search, Tracebacks)];
P --> Q[Log Test Success & Call cleanup_and_exit(True)];
end
ZFAIL --> ZEND[AutoTester.cleanup_and_exit() -> QCoreApplication.quit() & sys.exit()];
Q --> ZEND;

View File

@@ -16,6 +16,7 @@ This document outlines the key features of the Asset Processor Tool.
* Saves maps in appropriate formats (JPG, PNG, EXR) based on complex rules involving map type (`FORCE_LOSSLESS_MAP_TYPES`), resolution (`RESOLUTION_THRESHOLD_FOR_JPG`), bit depth, and source format. * Saves maps in appropriate formats (JPG, PNG, EXR) based on complex rules involving map type (`FORCE_LOSSLESS_MAP_TYPES`), resolution (`RESOLUTION_THRESHOLD_FOR_JPG`), bit depth, and source format.
* Calculates basic image statistics (Min/Max/Mean) for a reference resolution. * Calculates basic image statistics (Min/Max/Mean) for a reference resolution.
* Calculates and stores the relative aspect ratio change string in metadata (e.g., `EVEN`, `X150`, `Y125`). * Calculates and stores the relative aspect ratio change string in metadata (e.g., `EVEN`, `X150`, `Y125`).
* **Low-Resolution Fallback:** If enabled (`ENABLE_LOW_RESOLUTION_FALLBACK`), automatically saves an additional "LOWRES" variant of source images if their largest dimension is below a configurable threshold (`LOW_RESOLUTION_THRESHOLD`). This "LOWRES" variant uses the original image dimensions and is saved in addition to any standard resolution outputs.
* **Channel Merging:** Combines channels from different maps into packed textures (e.g., NRMRGH) based on preset rules (`MAP_MERGE_RULES` in `config.py`). * **Channel Merging:** Combines channels from different maps into packed textures (e.g., NRMRGH) based on preset rules (`MAP_MERGE_RULES` in `config.py`).
* **Metadata Generation:** Creates a `metadata.json` file for each asset containing details about maps, category, archetype, aspect ratio change, processing settings, etc. * **Metadata Generation:** Creates a `metadata.json` file for each asset containing details about maps, category, archetype, aspect ratio change, processing settings, etc.
* **Output Organization:** Creates a clean, structured output directory (`<output_base>/<supplier>/<asset_name>/`). * **Output Organization:** Creates a clean, structured output directory (`<output_base>/<supplier>/<asset_name>/`).

View File

@@ -13,6 +13,18 @@ The `app_settings.json` file is structured into several key sections, including:
* `ASSET_TYPE_DEFINITIONS`: Defines known asset types (like Surface, Model, Decal) and their properties. * `ASSET_TYPE_DEFINITIONS`: Defines known asset types (like Surface, Model, Decal) and their properties.
* `MAP_MERGE_RULES`: Defines how multiple input maps can be merged into a single output map (e.g., combining Normal and Roughness into one). * `MAP_MERGE_RULES`: Defines how multiple input maps can be merged into a single output map (e.g., combining Normal and Roughness into one).
### Low-Resolution Fallback Settings
These settings control the generation of low-resolution "fallback" variants for source images:
* `ENABLE_LOW_RESOLUTION_FALLBACK` (boolean, default: `true`):
* If `true`, the tool will generate an additional "LOWRES" variant for source images whose largest dimension is smaller than the `LOW_RESOLUTION_THRESHOLD`.
* This "LOWRES" variant uses the original dimensions of the source image and is saved in addition to any other standard resolution outputs (e.g., 1K, PREVIEW).
* If `false`, this feature is disabled.
* `LOW_RESOLUTION_THRESHOLD` (integer, default: `512`):
* Defines the pixel dimension (for the largest side of an image) below which the "LOWRES" fallback variant will be generated (if enabled).
* For example, if set to `512`, any source image smaller than 512x512 (e.g., 256x512, 128x128) will have a "LOWRES" variant created.
### LLM Predictor Settings ### LLM Predictor Settings
For users who wish to utilize the experimental LLM Predictor feature, the following settings are available in `config/llm_settings.json`: For users who wish to utilize the experimental LLM Predictor feature, the following settings are available in `config/llm_settings.json`:

View File

@@ -58,6 +58,7 @@ The `<output_base_directory>` (the root folder where processing output starts) i
Each asset directory contains the following: Each asset directory contains the following:
* Processed texture maps (e.g., `WoodFloor_Albedo_4k.png`, `MetalPanel_Normal_2k.exr`). The exact filenames depend on the `OUTPUT_FILENAME_PATTERN`. These are the resized, format-converted, and bit-depth adjusted texture files. * Processed texture maps (e.g., `WoodFloor_Albedo_4k.png`, `MetalPanel_Normal_2k.exr`). The exact filenames depend on the `OUTPUT_FILENAME_PATTERN`. These are the resized, format-converted, and bit-depth adjusted texture files.
* **LOWRES Variants:** If the "Low-Resolution Fallback" feature is enabled and a source image's dimensions are below the configured threshold, an additional variant with "LOWRES" as its resolution token (e.g., `MyTexture_COL_LOWRES.png`) will be saved. This variant uses the original dimensions of the source image.
* Merged texture maps (e.g., `WoodFloor_Combined_4k.png`). The exact filenames depend on the `OUTPUT_FILENAME_PATTERN`. These are maps created by combining channels from different source maps based on the configured merge rules. * Merged texture maps (e.g., `WoodFloor_Combined_4k.png`). The exact filenames depend on the `OUTPUT_FILENAME_PATTERN`. These are maps created by combining channels from different source maps based on the configured merge rules.
* Model files (if present in the source asset). * Model files (if present in the source asset).
* `metadata.json`: A JSON file containing detailed information about the asset and the processing that was performed. This includes details about the maps (resolutions, formats, bit depths, and for roughness maps, a `derived_from_gloss_filename: true` flag if it was inverted from an original gloss map), merged map details, calculated image statistics, aspect ratio change information, asset category and archetype, the source preset used, and a list of ignored source files. This file is intended for use by downstream tools or scripts (like the Blender integration scripts). * `metadata.json`: A JSON file containing detailed information about the asset and the processing that was performed. This includes details about the maps (resolutions, formats, bit depths, and for roughness maps, a `derived_from_gloss_filename: true` flag if it was inverted from an original gloss map), merged map details, calculated image statistics, aspect ratio change information, asset category and archetype, the source preset used, and a list of ignored source files. This file is intended for use by downstream tools or scripts (like the Blender integration scripts).

View File

@@ -0,0 +1,83 @@
# User Guide: Usage - Automated GUI Testing (`autotest.py`)
This document explains how to use the `autotest.py` script for automated sanity checks of the Asset Processor Tool's GUI-driven workflow.
## Overview
The `autotest.py` script provides a way to run predefined test scenarios headlessly (without displaying the GUI). It simulates the core user actions: loading an asset, selecting a preset, allowing rules to be predicted, processing the asset, and then checks the results against expectations. This is primarily intended as a developer tool for regression testing and ensuring core functionality remains stable.
## Running the Autotest Script
From the project root directory, you can run the script using Python:
```bash
python autotest.py [OPTIONS]
```
### Command-Line Options
The script accepts several command-line arguments to configure the test run. If not provided, they use predefined default values.
* `--zipfile PATH_TO_ZIP`:
* Specifies the path to the input asset `.zip` file to be used for the test.
* Default: `TestFiles/BoucleChunky001.zip`
* `--preset PRESET_NAME`:
* Specifies the name of the preset to be selected and used for rule prediction and processing.
* Default: `Dinesen`
* `--expectedrules PATH_TO_JSON`:
* Specifies the path to a JSON file containing the expected rule structure that should be generated after the preset is applied to the input asset.
* Default: `TestFiles/test-BoucleChunky001.json`
* `--outputdir PATH_TO_DIR`:
* Specifies the directory where the processed assets will be written.
* Default: `TestFiles/TestOutputs/DefaultTestOutput`
* `--search "SEARCH_TERM"` (optional):
* A string to search for within the application logs generated during the test run. If found, matching log lines (with context) will be highlighted.
* Default: None
* `--additional-lines NUM_LINES` (optional):
* When using `--search`, this specifies how many lines of context before and after each matching log line should be displayed.
* Default: `0`
**Example Usage:**
```bash
# Run with default test files and settings
python autotest.py
# Run with specific test files and search for a log message
python autotest.py --zipfile TestFiles/MySpecificAsset.zip --preset MyPreset --expectedrules TestFiles/MySpecificAsset_rules.json --outputdir TestFiles/TestOutputs/MySpecificOutput --search "Processing complete for asset"
```
## `TestFiles` Directory
The autotest script relies on a directory named `TestFiles` located in the project root. This directory should contain:
* **Test Asset `.zip` files:** The actual asset archives used as input for tests (e.g., `default_test_asset.zip`, `MySpecificAsset.zip`).
* **Expected Rules `.json` files:** JSON files defining the expected rule structure for a given asset and preset combination (e.g., `default_test_asset_rules.json`, `MySpecificAsset_rules.json`). The structure of this file is detailed in the main autotest plan (`AUTOTEST_GUI_PLAN.md`).
* **`TestOutputs/` subdirectory:** This is the default parent directory where the autotest script will create specific output folders for each test run (e.g., `TestFiles/TestOutputs/DefaultTestOutput/`).
## Test Workflow
When executed, `autotest.py` performs the following steps:
1. **Initialization:** Parses command-line arguments and initializes the main application components headlessly.
2. **Load Expected Rules:** Loads the `expected_rules.json` file.
3. **Load Asset:** Loads the specified `.zip` file into the application.
4. **Select Preset:** Selects the specified preset. This triggers the internal rule prediction process.
5. **Await Prediction:** Waits for the rule prediction to complete.
6. **Compare Rules:** Retrieves the predicted rules from the application and compares them against the loaded expected rules. If there's a mismatch, the test typically fails at this point.
7. **Start Processing:** If the rules match, it initiates the asset processing pipeline, directing output to the specified output directory.
8. **Await Processing:** Waits for all backend processing tasks to complete.
9. **Check Output:** Verifies the existence of the output directory and lists its contents. Basic checks ensure some output was generated.
10. **Analyze Logs:** Retrieves logs from the application. If a search term was provided, it filters and displays relevant log portions. It also checks for Python tracebacks, which usually indicate a failure.
11. **Report Result:** Prints a summary of the test outcome (success or failure) and exits with an appropriate status code (0 for success, 1 for failure).
## Interpreting Results
* **Console Output:** The script will log its progress and the results of each step to the console.
* **Log Analysis:** Pay attention to the log output, especially if a `--search` term was used or if any tracebacks are reported.
* **Exit Code:**
* `0`: Test completed successfully.
* `1`: Test failed at some point (e.g., rule mismatch, processing error, traceback found).
* **Output Directory:** Inspect the contents of the specified output directory to manually verify the processed assets if needed.
This automated test helps ensure the stability of the core processing logic when driven by GUI-equivalent actions.

View File

@@ -12,6 +12,9 @@ The tool's configuration is loaded from several JSON files, providing a layered
1. **Application Settings (`config/app_settings.json`):** This JSON file defines the core global default settings, constants, and rules that apply generally across different asset sources (e.g., the global `OUTPUT_DIRECTORY_PATTERN` and `OUTPUT_FILENAME_PATTERN`, standard image resolutions, map merge rules, output format rules, Blender paths, temporary directory prefix, initial scaling mode, merge dimension mismatch strategy). See the [User Guide: Output Structure](../01_User_Guide/09_Output_Structure.md#available-tokens) for a list of available tokens for these patterns. 1. **Application Settings (`config/app_settings.json`):** This JSON file defines the core global default settings, constants, and rules that apply generally across different asset sources (e.g., the global `OUTPUT_DIRECTORY_PATTERN` and `OUTPUT_FILENAME_PATTERN`, standard image resolutions, map merge rules, output format rules, Blender paths, temporary directory prefix, initial scaling mode, merge dimension mismatch strategy). See the [User Guide: Output Structure](../01_User_Guide/09_Output_Structure.md#available-tokens) for a list of available tokens for these patterns.
* *Note:* `ASSET_TYPE_DEFINITIONS` and `FILE_TYPE_DEFINITIONS` are no longer stored here; they have been moved to dedicated files. * *Note:* `ASSET_TYPE_DEFINITIONS` and `FILE_TYPE_DEFINITIONS` are no longer stored here; they have been moved to dedicated files.
* It also includes settings for new features like the "Low-Resolution Fallback":
* `ENABLE_LOW_RESOLUTION_FALLBACK` (boolean): Enables or disables the generation of "LOWRES" variants for small source images. Defaults to `true`.
* `LOW_RESOLUTION_THRESHOLD` (integer): The pixel dimension threshold (largest side) below which a "LOWRES" variant is created if the feature is enabled. Defaults to `512`.
2. **User Settings (`config/user_settings.json`):** This optional JSON file allows users to override specific settings defined in `config/app_settings.json`. If this file exists, its values for corresponding keys will take precedence over the base application settings. This file is primarily managed through the GUI's Application Preferences Editor. 2. **User Settings (`config/user_settings.json`):** This optional JSON file allows users to override specific settings defined in `config/app_settings.json`. If this file exists, its values for corresponding keys will take precedence over the base application settings. This file is primarily managed through the GUI's Application Preferences Editor.

View File

@@ -50,27 +50,44 @@ These stages are executed sequentially once for each asset before the core item
### Core Item Processing Loop ### Core Item Processing Loop
The [`PipelineOrchestrator`](processing/pipeline/orchestrator.py:36) iterates through the `context.processing_items` list (populated by the [`PrepareProcessingItemsStage`](processing/pipeline/stages/prepare_processing_items.py:10)). For each item (either a [`FileRule`](rule_structure.py:5) for a regular map or a [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16) for a merged map), the following stages are executed sequentially: The [`PipelineOrchestrator`](processing/pipeline/orchestrator.py:36) iterates through the `context.processing_items` list (populated by the [`PrepareProcessingItemsStage`](processing/pipeline/stages/prepare_processing_items.py:10)). Each `item` in this list is now either a [`ProcessingItem`](rule_structure.py:0) (representing a specific variant of a source map, e.g., Color at 1K, or Color at LOWRES) or a [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16).
1. **[`PrepareProcessingItemsStage`](processing/pipeline/stages/prepare_processing_items.py:10)** (`processing/pipeline/stages/prepare_processing_items.py`): 1. **[`PrepareProcessingItemsStage`](processing/pipeline/stages/prepare_processing_items.py:10)** (`processing/pipeline/stages/prepare_processing_items.py`):
* **Responsibility**: (Executed once before the loop) Creates the `context.processing_items` list by combining [`FileRule`](rule_structure.py:5)s from `context.files_to_process` and [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16)s derived from the global `map_merge_rules` configuration. It correctly accesses `map_merge_rules` from `context.config_obj` and validates each merge rule for the presence of `output_map_type` and a dictionary for `inputs`. Initializes `context.intermediate_results`. * **Responsibility**: (Executed once before the loop) This stage is now responsible for "exploding" each relevant [`FileRule`](rule_structure.py:5) into one or more [`ProcessingItem`](rule_structure.py:0) objects.
* **Context Interaction**: Reads from `context.files_to_process` and `context.config_obj` (accessing `map_merge_rules`). Populates `context.processing_items` and initializes `context.intermediate_results`. * For each [`FileRule`](rule_structure.py:5) that represents an image map:
* It loads the source image data and determines its original dimensions and bit depth.
* It creates standard [`ProcessingItem`](rule_structure.py:0)s for each required output resolution (e.g., "1K", "PREVIEW"), populating them with a copy of the source image data and the respective `resolution_key`.
* If the "Low-Resolution Fallback" feature is enabled (`ENABLE_LOW_RESOLUTION_FALLBACK` in config) and the source image's largest dimension is below `LOW_RESOLUTION_THRESHOLD`, it creates an additional [`ProcessingItem`](rule_structure.py:0) with `resolution_key="LOWRES"`, using the original image data and dimensions.
* It also adds [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16)s derived from global `map_merge_rules`.
* **Context Interaction**: Reads `context.files_to_process` and `context.config_obj`. Populates `context.processing_items` with a list of [`ProcessingItem`](rule_structure.py:0) and [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16) objects. Initializes `context.intermediate_results`.
2. **[`RegularMapProcessorStage`](processing/pipeline/stages/regular_map_processor.py:18)** (`processing/pipeline/stages/regular_map_processor.py`): For each `item` in `context.processing_items`:
* **Responsibility**: (Executed per [`FileRule`](rule_structure.py:5) item) Checks if the `FileRule.item_type` starts with "MAP_". If not, the item is skipped. Otherwise, it loads the image data for the file, determines its potentially suffixed internal map type (e.g., "MAP_COL-1"), applies in-memory transformations (Gloss-to-Rough, Normal Green Invert) using the shared utility function [`apply_common_map_transformations`](processing/utils/image_processing_utils.py), and returns the processed image data and details in a [`ProcessedRegularMapData`](processing/pipeline/asset_context.py:23) object. The `internal_map_type` in the output reflects any transformations (e.g., "MAP_GLOSS" becomes "MAP_ROUGH").
* **Context Interaction**: Reads from the input [`FileRule`](rule_structure.py:5) (checking `item_type`) and [`Configuration`](configuration.py:68). Returns a [`ProcessedRegularMapData`](processing/pipeline/asset_context.py:23) object which is stored in `context.intermediate_results`. 2. **Transformations (Implicit or via a dedicated stage - formerly `RegularMapProcessorStage` logic):**
* **Responsibility**: If the `item` is a [`ProcessingItem`](rule_structure.py:0), its `image_data` (loaded by `PrepareProcessingItemsStage`) may need transformations (Gloss-to-Rough, Normal Green Invert). This logic, previously in `RegularMapProcessorStage`, might be integrated into `PrepareProcessingItemsStage` before `ProcessingItem` creation, or handled by a new dedicated transformation stage that operates on `ProcessingItem.image_data`. The `item.map_type_identifier` would be updated if a transformation like Gloss-to-Rough occurs.
* **Context Interaction**: Modifies `item.image_data` and `item.map_type_identifier` within the [`ProcessingItem`](rule_structure.py:0) object.
3. **[`MergedTaskProcessorStage`](processing/pipeline/stages/merged_task_processor.py:68)** (`processing/pipeline/stages/merged_task_processor.py`): 3. **[`MergedTaskProcessorStage`](processing/pipeline/stages/merged_task_processor.py:68)** (`processing/pipeline/stages/merged_task_processor.py`):
* **Responsibility**: (Executed per [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16) item) Validates that all input map types specified in the merge rule start with "MAP_". If not, the task is failed. It dynamically loads input images by looking up the required input map types (e.g., "MAP_NRM") in `context.processed_maps_details` and using the temporary file paths from their `saved_files_info`. It applies in-memory transformations to inputs using [`apply_common_map_transformations`](processing/utils/image_processing_utils.py), handles dimension mismatches (with fallback creation if configured and `source_dimensions` are available), performs the channel merging operation, and returns the merged image data and details in a [`ProcessedMergedMapData`](processing/pipeline/asset_context.py:35) object. The `output_map_type` of the merged map must also be "MAP_" prefixed in the configuration. * **Responsibility**: (Executed if `item` is a [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16)) Same as before: validates inputs, loads source map data (likely from `ProcessingItem`s in `context.processing_items` or a cache populated from them), applies transformations, merges channels, and returns [`ProcessedMergedMapData`](processing/pipeline/asset_context.py:35).
* **Context Interaction**: Reads from the input [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16) (checking input map types), `context.workspace_path`, `context.processed_maps_details` (for input image data), and [`Configuration`](configuration.py:68). Returns a [`ProcessedMergedMapData`](processing/pipeline/asset_context.py:35) object which is stored in `context.intermediate_results`. * **Context Interaction**: Reads [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16), potentially `context.processing_items` (or a cache derived from it) for input image data. Returns [`ProcessedMergedMapData`](processing/pipeline/asset_context.py:35).
4. **[`InitialScalingStage`](processing/pipeline/stages/initial_scaling.py:14)** (`processing/pipeline/stages/initial_scaling.py`): 4. **[`InitialScalingStage`](processing/pipeline/stages/initial_scaling.py:14)** (`processing/pipeline/stages/initial_scaling.py`):
* **Responsibility**: (Executed per item) Applies initial scaling (e.g., Power-of-Two downscaling) to the image data from the previous processing stage based on the `initial_scaling_mode` configuration. * **Responsibility**: (Executed per item)
* **Context Interaction**: Takes a [`InitialScalingInput`](processing/pipeline/asset_context.py:46) (containing image data and config) and returns an [`InitialScalingOutput`](processing/pipeline/asset_context.py:54) object, which updates the item's entry in `context.intermediate_results`. * If `item` is a [`ProcessingItem`](rule_structure.py:0): Takes `item.image_data`, `item.current_dimensions`, and `item.resolution_key` as input. If `item.resolution_key` is "LOWRES", POT scaling is skipped. Otherwise, applies POT scaling if configured.
* If `item` is from a `MergeTaskDefinition` (i.e., `processed_data` from `MergedTaskProcessorStage`): Applies POT scaling as before.
* **Context Interaction**: Takes [`InitialScalingInput`](processing/pipeline/asset_context.py:46) (now including `resolution_key`). Returns [`InitialScalingOutput`](processing/pipeline/asset_context.py:54) (also including `resolution_key`), which updates `context.intermediate_results`. The `current_image_data` and `current_dimensions` for saving are taken from this output.
5. **[`SaveVariantsStage`](processing/pipeline/stages/save_variants.py:15)** (`processing/pipeline/stages/save_variants.py`): 5. **[`SaveVariantsStage`](processing/pipeline/stages/save_variants.py:15)** (`processing/pipeline/stages/save_variants.py`):
* **Responsibility**: (Executed per item) Takes the final processed image data (potentially scaled) and configuration, and calls a utility to save the image to temporary files in various resolutions and formats as defined by the configuration. * **Responsibility**: (Executed per item) Saves the (potentially scaled) `current_image_data`.
* **Context Interaction**: Takes a [`SaveVariantsInput`](processing/pipeline/asset_context.py:61) object (which includes the "MAP_" prefixed `internal_map_type`). It uses the `get_filename_friendly_map_type` utility to convert this to a "standard type" (e.g., "COL") for output naming. Returns a [`SaveVariantsOutput`](processing/pipeline/asset_context.py:79) object containing details about the saved temporary files. The orchestrator stores these details, including the original "MAP_" prefixed `internal_map_type`, in `context.processed_maps_details` for the item. * **Context Interaction**:
* Takes [`SaveVariantsInput`](processing/pipeline/asset_context.py:61).
* `internal_map_type` is set from `item.map_type_identifier` (for `ProcessingItem`) or `processed_data.output_map_type` (for merged).
* `output_filename_pattern_tokens['resolution']` is set to the `resolution_key` obtained from `scaled_data_output.resolution_key` (which originates from `item.resolution_key` for `ProcessingItem`s, or is `None` for merged items that get all standard resolutions).
* `image_resolutions` argument for `SaveVariantsInput`:
* If `resolution_key == "LOWRES"`: Set to `{"LOWRES": width_of_lowres_data}`.
* If `resolution_key` is a standard key (e.g., "1K"): Set to `{resolution_key: configured_dimension}`.
* For merged items (where `resolution_key` from scaling is likely `None`): Set to the full `config.image_resolutions` map to generate all applicable standard sizes.
* Returns [`SaveVariantsOutput`](processing/pipeline/asset_context.py:79). Orchestrator stores details in `context.processed_maps_details`.
### Post-Item Stages ### Post-Item Stages

Binary file not shown.

View File

@@ -0,0 +1,57 @@
{
"source_rules": [
{
"input_path": "BoucleChunky001.zip",
"supplier_identifier": "Dinesen",
"preset_name": null,
"assets": [
{
"asset_name": "BoucleChunky001",
"asset_type": "Surface",
"files": [
{
"file_path": "BoucleChunky001_AO_1K_METALNESS.png",
"item_type": "MAP_AO",
"target_asset_name_override": "BoucleChunky001"
},
{
"file_path": "BoucleChunky001_COL_1K_METALNESS.png",
"item_type": "MAP_COL",
"target_asset_name_override": "BoucleChunky001"
},
{
"file_path": "BoucleChunky001_DISP16_1K_METALNESS.png",
"item_type": "MAP_DISP",
"target_asset_name_override": "BoucleChunky001"
},
{
"file_path": "BoucleChunky001_DISP_1K_METALNESS.png",
"item_type": "EXTRA",
"target_asset_name_override": "BoucleChunky001"
},
{
"file_path": "BoucleChunky001_Fabric.png",
"item_type": "EXTRA",
"target_asset_name_override": "BoucleChunky001"
},
{
"file_path": "BoucleChunky001_METALNESS_1K_METALNESS.png",
"item_type": "MAP_METAL",
"target_asset_name_override": "BoucleChunky001"
},
{
"file_path": "BoucleChunky001_NRM_1K_METALNESS.png",
"item_type": "MAP_NRM",
"target_asset_name_override": "BoucleChunky001"
},
{
"file_path": "BoucleChunky001_ROUGHNESS_1K_METALNESS.png",
"item_type": "MAP_ROUGH",
"target_asset_name_override": "BoucleChunky001"
}
]
}
]
}
]
}

855
autotest.py Normal file
View File

@@ -0,0 +1,855 @@
import argparse
import sys
import logging
import logging.handlers
import time
import json
import shutil # Import shutil for directory operations
from pathlib import Path
from typing import List, Dict, Any
from PySide6.QtCore import QCoreApplication, QTimer, Slot, QEventLoop, QObject, Signal
from PySide6.QtWidgets import QApplication, QListWidgetItem
# Add project root to sys.path
project_root = Path(__file__).resolve().parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
try:
from main import App
from gui.main_window import MainWindow
from rule_structure import SourceRule # Assuming SourceRule is in rule_structure.py
except ImportError as e:
print(f"Error importing project modules: {e}")
print(f"Ensure that the script is run from the project root or that the project root is in PYTHONPATH.")
print(f"Current sys.path: {sys.path}")
sys.exit(1)
# Global variable for the memory log handler
autotest_memory_handler = None
# Custom Log Filter for Concise Output
class InfoSummaryFilter(logging.Filter):
# Keywords that identify INFO messages to *allow* for concise output
SUMMARY_KEYWORDS_PRECISE = [
"Test run completed",
"Test succeeded",
"Test failed",
"Rule comparison successful",
"Rule comparison failed",
"ProcessingEngine finished. Summary:",
"Autotest Context:",
"Parsed CLI arguments:",
"Prediction completed successfully.",
"Processing completed.",
"Signal 'all_tasks_finished' received",
"final status:", # To catch "Asset '...' final status:"
"User settings file not found:",
"MainPanelWidget: Default output directory set to:",
# Search related (as per original filter)
"Searching logs for term",
"Search term ",
"Found ",
"No tracebacks found in the logs.",
"--- End Log Analysis ---",
"Log analysis completed.",
]
# Patterns for case-insensitive rejection
REJECT_PATTERNS_LOWER = [
# Original debug prefixes (ensure these are still relevant or merge if needed)
"debug:", "orchestrator_trace:", "configuration_debug:", "app_debug:", "output_org_debug:",
# Iterative / Per-item / Per-file details / Intermediate steps
": item ", # Catches "Asset '...', Item X/Y"
"item successfully processed and saved",
", file '", # Catches "Asset '...', File '...'"
": processing regular map",
": found source file:",
": determined source bit depth:",
"successfully processed regular map",
"successfully created mergetaskdefinition",
": preparing processing items",
": finished preparing items. found",
": starting core item processing loop",
", task '",
": processing merge task",
"loaded from context:",
"using dimensions from first loaded input",
"successfully merged inputs into image",
"successfully processed merge task",
"mergedtaskprocessorstage result",
"calling savevariantsstage",
"savevariantsstage result",
"adding final details to context",
": finished core item processing loop",
": copied variant",
": copied extra file",
": successfully organized",
": output organization complete.",
": metadata saved to",
"worker thread: starting processing for rule:",
"preparing workspace for input:",
"input is a supported archive",
"calling processingengine.process with rule",
"calculated sha5 for",
"calculated next incrementing value for",
"verify: processingengine.process called",
": effective supplier set to",
": metadata initialized.",
"path",
"\\asset_processor",
": file rules queued for processing",
"successfully loaded base application settings",
"successfully loaded and merged asset_type_definitions",
"successfully loaded and merged file_type_definitions",
"starting rule-based prediction for:",
"rule-based prediction finished successfully for",
"finished rule-based prediction run for",
"updating model with rule-based results for source:",
"debug task ",
"worker thread: finished processing for rule:",
"task finished signal received for",
# Autotest step markers (not global summaries)
]
def filter(self, record):
# Allow CRITICAL, ERROR, WARNING unconditionally
if record.levelno >= logging.WARNING:
return True
if record.levelno == logging.INFO:
msg = record.getMessage()
msg_lower = msg.lower() # For case-insensitive pattern rejection
# 1. Explicitly REJECT if message contains verbose patterns (case-insensitive)
for pattern in self.REJECT_PATTERNS_LOWER: # Use the new list
if pattern in msg_lower:
return False # Reject
# 2. Then, if not rejected, ALLOW only if message contains precise summary keywords
for keyword in self.SUMMARY_KEYWORDS_PRECISE: # Use the new list
if keyword in msg: # Original message for case-sensitive summary keywords if needed
return True # Allow
# 3. Reject all other INFO messages that don't match precise summary keywords
return False
# Reject levels below INFO (e.g., DEBUG) by default for this handler
return False
# --- Root Logger Configuration for Concise Console Output ---
def setup_autotest_logging():
"""
Configures the root logger for concise console output for autotest.py.
This ensures that only essential summary information, warnings, and errors
are displayed on the console by default.
"""
root_logger = logging.getLogger()
# 1. Remove all existing handlers from the root logger.
# This prevents interference from other logging configurations.
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
handler.close() # Close handler before removing
# 2. Set the root logger's level to DEBUG to capture everything for the memory handler.
# The console handler will still filter down to INFO/selected.
root_logger.setLevel(logging.DEBUG) # Changed from INFO to DEBUG
# 3. Create a new StreamHandler for sys.stdout (for concise console output).
console_handler = logging.StreamHandler(sys.stdout)
# 4. Set this console handler's level to INFO.
# The filter will then decide which INFO messages to display on console.
console_handler.setLevel(logging.INFO)
# 5. Apply the enhanced InfoSummaryFilter to the console handler.
info_filter = InfoSummaryFilter()
console_handler.addFilter(info_filter)
# 6. Set a concise formatter for the console handler.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
console_handler.setFormatter(formatter)
# 7. Add this newly configured console handler to the root_logger.
root_logger.addHandler(console_handler)
# 8. Setup the MemoryHandler
global autotest_memory_handler # Declare usage of global
autotest_memory_handler = logging.handlers.MemoryHandler(
capacity=20000, # Increased capacity
flushLevel=logging.CRITICAL + 1, # Prevent automatic flushing
target=None # Does not flush to another handler
)
autotest_memory_handler.setLevel(logging.DEBUG) # Capture all logs from DEBUG up
# Not adding a formatter here, will format in _process_and_display_logs
# 9. Add the memory handler to the root logger.
root_logger.addHandler(autotest_memory_handler)
# Call the setup function early in the script's execution.
setup_autotest_logging()
# Logger for autotest.py's own messages.
# Messages from this logger will propagate to the root logger and be filtered
# by the console_handler configured above.
# Setting its level to DEBUG allows autotest.py to generate DEBUG messages,
# which won't appear on the concise console (due to handler's INFO level)
# but can be captured by other handlers (e.g., the GUI's log console).
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Ensure autotest.py can generate DEBUGs for other handlers
# Note: The GUI's log console (e.g., self.main_window.log_console.log_console_output)
# is assumed to capture all logs (including DEBUG) from various modules.
# The _process_and_display_logs function then uses these comprehensive logs for the --search feature.
# This root logger setup primarily makes autotest.py's direct console output concise,
# ensuring that only filtered, high-level information appears on stdout by default.
# --- End of Root Logger Configuration ---
# --- Argument Parsing ---
def parse_arguments():
"""Parses command-line arguments for the autotest script."""
parser = argparse.ArgumentParser(description="Automated test script for Asset Processor GUI.")
parser.add_argument(
"--zipfile",
type=Path,
default=project_root / "TestFiles" / "BoucleChunky001.zip",
help="Path to the test asset ZIP file. Default: TestFiles/BoucleChunky001.zip"
)
parser.add_argument(
"--preset",
type=str,
default="Dinesen", # This should match a preset name in the application
help="Name of the preset to use. Default: Dinesen"
)
parser.add_argument(
"--expectedrules",
type=Path,
default=project_root / "TestFiles" / "Test-BoucleChunky001.json",
help="Path to the JSON file with expected rules. Default: TestFiles/Test-BoucleChunky001.json"
)
parser.add_argument(
"--outputdir",
type=Path,
default=project_root / "TestFiles" / "TestOutputs" / "BoucleChunkyOutput",
help="Path for processing output. Default: TestFiles/TestOutputs/BoucleChunkyOutput"
)
parser.add_argument(
"--search",
type=str,
default=None,
help="Optional log search term. Default: None"
)
parser.add_argument(
"--additional-lines",
type=int,
default=0,
help="Context lines for log search. Default: 0"
)
return parser.parse_args()
class AutoTester(QObject):
"""
Handles the automated testing process for the Asset Processor GUI.
"""
# Define signals if needed, e.g., for specific test events
# test_step_completed = Signal(str)
def __init__(self, app_instance: App, cli_args: argparse.Namespace):
super().__init__()
self.app_instance: App = app_instance
self.main_window: MainWindow = app_instance.main_window
self.cli_args: argparse.Namespace = cli_args
self.event_loop = QEventLoop(self)
self.prediction_poll_timer = QTimer(self)
self.expected_rules_data: Dict[str, Any] = {}
self.test_step: str = "INIT" # Possible values: INIT, LOADING_ZIP, SELECTING_PRESET, AWAITING_PREDICTION, PREDICTION_COMPLETE, COMPARING_RULES, STARTING_PROCESSING, AWAITING_PROCESSING, PROCESSING_COMPLETE, CHECKING_OUTPUT, ANALYZING_LOGS, DONE
if not self.main_window:
logger.error("MainWindow instance not found in App. Cannot proceed.")
self.cleanup_and_exit(success=False)
return
# Connect signals
if hasattr(self.app_instance, 'all_tasks_finished') and isinstance(self.app_instance.all_tasks_finished, Signal):
self.app_instance.all_tasks_finished.connect(self._on_all_tasks_finished)
else:
logger.warning("App instance does not have 'all_tasks_finished' signal or it's not a Signal. Processing completion might not be detected.")
self._load_expected_rules()
def _load_expected_rules(self) -> None:
"""Loads the expected rules from the JSON file specified by cli_args."""
self.test_step = "LOADING_EXPECTED_RULES"
logger.debug(f"Loading expected rules from: {self.cli_args.expectedrules}")
try:
with open(self.cli_args.expectedrules, 'r') as f:
self.expected_rules_data = json.load(f)
logger.debug("Expected rules loaded successfully.")
except FileNotFoundError:
logger.error(f"Expected rules file not found: {self.cli_args.expectedrules}")
self.cleanup_and_exit(success=False)
except json.JSONDecodeError as e:
logger.error(f"Error decoding expected rules JSON: {e}")
self.cleanup_and_exit(success=False)
except Exception as e:
logger.error(f"An unexpected error occurred while loading expected rules: {e}")
self.cleanup_and_exit(success=False)
def run_test(self) -> None:
"""Orchestrates the test steps."""
logger.info("Starting test run...")
if not self.expected_rules_data: # Ensure rules were loaded
logger.error("Expected rules not loaded. Aborting test.")
self.cleanup_and_exit(success=False)
return
# Add a specific summary log for essential context
logger.info(f"Autotest Context: Input='{self.cli_args.zipfile.name}', Preset='{self.cli_args.preset}', Output='{self.cli_args.outputdir}'")
# Step 1: Load ZIP
self.test_step = "LOADING_ZIP"
logger.info(f"Step 1: Loading ZIP file: {self.cli_args.zipfile}") # KEEP INFO - Passes filter
if not self.cli_args.zipfile.exists():
logger.error(f"ZIP file not found: {self.cli_args.zipfile}")
self.cleanup_and_exit(success=False)
return
try:
# Assuming add_input_paths can take a list of strings or Path objects
self.main_window.add_input_paths([str(self.cli_args.zipfile)])
logger.debug("ZIP file loading initiated.")
except Exception as e:
logger.error(f"Error during ZIP file loading: {e}")
self.cleanup_and_exit(success=False)
return
# Step 2: Select Preset
self.test_step = "SELECTING_PRESET"
logger.info(f"Step 2: Selecting preset: {self.cli_args.preset}") # KEEP INFO - Passes filter
preset_found = False
preset_list_widget = self.main_window.preset_editor_widget.editor_preset_list
for i in range(preset_list_widget.count()):
item = preset_list_widget.item(i)
if item and item.text() == self.cli_args.preset:
preset_list_widget.setCurrentItem(item)
logger.debug(f"Preset '{self.cli_args.preset}' selected.")
preset_found = True
break
if not preset_found:
logger.error(f"Preset '{self.cli_args.preset}' not found in the list.")
available_presets = [preset_list_widget.item(i).text() for i in range(preset_list_widget.count())]
logger.debug(f"Available presets: {available_presets}")
self.cleanup_and_exit(success=False)
return
# Step 3: Await Prediction Completion
self.test_step = "AWAITING_PREDICTION"
logger.debug("Step 3: Awaiting prediction completion...")
self.prediction_poll_timer.timeout.connect(self._check_prediction_status)
self.prediction_poll_timer.start(500) # Poll every 500ms
# Use a QTimer to allow event loop to process while waiting for this step
# This ensures that the _check_prediction_status can be called.
# We will exit this event_loop from _check_prediction_status when prediction is done.
logger.debug("Starting event loop for prediction...")
self.event_loop.exec() # This loop is quit by _check_prediction_status
self.prediction_poll_timer.stop()
logger.debug("Event loop for prediction finished.")
if self.test_step != "PREDICTION_COMPLETE":
logger.error(f"Prediction did not complete as expected. Current step: {self.test_step}")
# Check if there were any pending predictions that never cleared
if hasattr(self.main_window, '_pending_predictions'):
logger.error(f"Pending predictions at timeout: {self.main_window._pending_predictions}")
self.cleanup_and_exit(success=False)
return
logger.info("Prediction completed successfully.") # KEEP INFO - Passes filter
# Step 4: Retrieve & Compare Rulelist
self.test_step = "COMPARING_RULES"
logger.info("Step 4: Retrieving and Comparing Rules...") # KEEP INFO - Passes filter
actual_source_rules_list: List[SourceRule] = self.main_window.unified_model.get_all_source_rules()
actual_rules_obj = actual_source_rules_list # Keep the SourceRule list for processing
comparable_actual_rules = self._convert_rules_to_comparable(actual_source_rules_list)
if not self._compare_rules(comparable_actual_rules, self.expected_rules_data):
logger.error("Rule comparison failed. See logs for details.")
self.cleanup_and_exit(success=False)
return
logger.info("Rule comparison successful.") # KEEP INFO - Passes filter
# Step 5: Start Processing
self.test_step = "START_PROCESSING"
logger.info("Step 5: Starting Processing...") # KEEP INFO - Passes filter
processing_settings = {
"output_dir": str(self.cli_args.outputdir), # Ensure it's a string for JSON/config
"overwrite": True,
"workers": 1,
"blender_enabled": False # Basic test, no Blender
}
try:
Path(self.cli_args.outputdir).mkdir(parents=True, exist_ok=True)
logger.debug(f"Ensured output directory exists: {self.cli_args.outputdir}")
except Exception as e:
logger.error(f"Could not create output directory {self.cli_args.outputdir}: {e}")
self.cleanup_and_exit(success=False)
return
if hasattr(self.main_window, 'start_backend_processing') and isinstance(self.main_window.start_backend_processing, Signal):
logger.debug(f"Emitting start_backend_processing with rules count: {len(actual_rules_obj)} and settings: {processing_settings}")
self.main_window.start_backend_processing.emit(actual_rules_obj, processing_settings)
else:
logger.error("'start_backend_processing' signal not found on MainWindow. Cannot start processing.")
self.cleanup_and_exit(success=False)
return
# Step 6: Await Processing Completion
self.test_step = "AWAIT_PROCESSING"
logger.debug("Step 6: Awaiting processing completion...")
self.event_loop.exec() # This loop is quit by _on_all_tasks_finished
if self.test_step != "PROCESSING_COMPLETE":
logger.error(f"Processing did not complete as expected. Current step: {self.test_step}")
self.cleanup_and_exit(success=False)
return
logger.info("Processing completed.") # KEEP INFO - Passes filter
# Step 7: Check Output Path
self.test_step = "CHECK_OUTPUT"
logger.info(f"Step 7: Checking output path: {self.cli_args.outputdir}") # KEEP INFO - Passes filter
output_path = Path(self.cli_args.outputdir)
if not output_path.exists() or not output_path.is_dir():
logger.error(f"Output directory {output_path} does not exist or is not a directory.")
self.cleanup_and_exit(success=False)
return
output_items = list(output_path.iterdir())
if not output_items:
logger.warning(f"Output directory {output_path} is empty. This might be a test failure depending on the case.")
# For a more specific check, one might iterate through actual_rules_obj
# and verify if subdirectories matching asset_name exist.
# e.g. for asset_rule in source_rule.assets:
# expected_asset_dir = output_path / asset_rule.asset_name
# if not expected_asset_dir.is_dir(): logger.error(...)
else:
logger.debug(f"Found {len(output_items)} item(s) in output directory:")
for item in output_items:
logger.debug(f" - {item.name} ({'dir' if item.is_dir() else 'file'})")
logger.info("Output path check completed.") # KEEP INFO - Passes filter
# Step 8: Retrieve & Analyze Logs
self.test_step = "CHECK_LOGS"
logger.debug("Step 8: Retrieving and Analyzing Logs...")
all_logs_text = ""
if self.main_window.log_console and self.main_window.log_console.log_console_output:
all_logs_text = self.main_window.log_console.log_console_output.toPlainText()
else:
logger.warning("Log console or output widget not found. Cannot retrieve logs.")
self._process_and_display_logs(all_logs_text)
logger.info("Log analysis completed.")
# Final Step
logger.info("Test run completed successfully.") # KEEP INFO - Passes filter
self.cleanup_and_exit(success=True)
@Slot()
def _check_prediction_status(self) -> None:
"""Polls the main window for pending predictions."""
# logger.debug(f"Checking prediction status. Pending: {self.main_window._pending_predictions if hasattr(self.main_window, '_pending_predictions') else 'N/A'}")
if hasattr(self.main_window, '_pending_predictions'):
if not self.main_window._pending_predictions: # Assuming _pending_predictions is a list/dict that's empty when done
logger.debug("No pending predictions. Prediction assumed complete.")
self.test_step = "PREDICTION_COMPLETE"
if self.event_loop.isRunning():
self.event_loop.quit()
# else:
# logger.debug(f"Still awaiting predictions: {len(self.main_window._pending_predictions)} remaining.")
else:
logger.warning("'_pending_predictions' attribute not found on MainWindow. Cannot check prediction status automatically.")
# As a fallback, if the attribute is missing, we might assume prediction is instant or needs manual check.
# For now, let's assume it means it's done if the attribute is missing, but this is risky.
# A better approach would be to have a clear signal from MainWindow when predictions are done.
self.test_step = "PREDICTION_COMPLETE" # Risky assumption
if self.event_loop.isRunning():
self.event_loop.quit()
@Slot(int, int, int)
def _on_all_tasks_finished(self, processed_count: int, skipped_count: int, failed_count: int) -> None:
"""Slot for App.all_tasks_finished signal."""
logger.info(f"Signal 'all_tasks_finished' received: Processed={processed_count}, Skipped={skipped_count}, Failed={failed_count}") # KEEP INFO - Passes filter
if self.test_step == "AWAIT_PROCESSING":
logger.debug("Processing completion signal received.") # Covered by the summary log above
if failed_count > 0:
logger.error(f"Processing finished with {failed_count} failed task(s).")
# Even if tasks failed, the test might pass based on output checks.
# The error is logged for information.
self.test_step = "PROCESSING_COMPLETE"
if self.event_loop.isRunning():
self.event_loop.quit()
else:
logger.warning(f"Signal 'all_tasks_finished' received at an unexpected test step: '{self.test_step}'. Counts: P={processed_count}, S={skipped_count}, F={failed_count}")
def _convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> Dict[str, Any]:
"""
Converts a list of SourceRule objects to a dictionary structure
suitable for comparison with the expected_rules.json.
"""
logger.debug(f"Converting {len(source_rules_list)} SourceRule objects to comparable dictionary...")
comparable_sources_list = []
for source_rule_obj in source_rules_list:
comparable_asset_list = []
# source_rule_obj.assets is List[AssetRule]
for asset_rule_obj in source_rule_obj.assets:
comparable_file_list = []
# asset_rule_obj.files is List[FileRule]
for file_rule_obj in asset_rule_obj.files:
comparable_file_list.append({
"file_path": file_rule_obj.file_path,
"item_type": file_rule_obj.item_type,
"target_asset_name_override": file_rule_obj.target_asset_name_override
})
comparable_asset_list.append({
"asset_name": asset_rule_obj.asset_name,
"asset_type": asset_rule_obj.asset_type,
"files": comparable_file_list
})
comparable_sources_list.append({
"input_path": Path(source_rule_obj.input_path).name, # Use only the filename
"supplier_identifier": source_rule_obj.supplier_identifier,
"preset_name": source_rule_obj.preset_name,
"assets": comparable_asset_list
})
logger.debug("Conversion to comparable dictionary finished.")
return {"source_rules": comparable_sources_list}
def _compare_rule_item(self, actual_item: Dict[str, Any], expected_item: Dict[str, Any], item_type_name: str, parent_context: str = "") -> bool:
"""
Recursively compares an individual actual rule item dictionary with an expected rule item dictionary.
Logs differences and returns True if they match, False otherwise.
"""
item_match = True
identifier = ""
if item_type_name == "SourceRule":
identifier = expected_item.get('input_path', f'UnknownSource_at_{parent_context}')
elif item_type_name == "AssetRule":
identifier = expected_item.get('asset_name', f'UnknownAsset_at_{parent_context}')
elif item_type_name == "FileRule":
identifier = expected_item.get('file_path', f'UnknownFile_at_{parent_context}')
current_context = f"{parent_context}/{identifier}" if parent_context else identifier
# Log Extra Fields: Iterate through keys in actual_item.
# If a key is in actual_item but not in expected_item (and is not a list container like "assets" or "files"),
# log this as an informational message.
for key in actual_item.keys():
if key not in expected_item and key not in ["assets", "files"]:
logger.debug(f"Field '{key}' present in actual {item_type_name} ({current_context}) but not specified in expected. Value: '{actual_item[key]}'")
# Check Expected Fields: Iterate through keys in expected_item.
for key, expected_value in expected_item.items():
if key not in actual_item:
logger.error(f"Missing expected field '{key}' in actual {item_type_name} ({current_context}).")
item_match = False
continue # Continue to check other fields in the expected_item
actual_value = actual_item[key]
if key == "assets": # List of AssetRule dictionaries
if not self._compare_list_of_rules(actual_value, expected_value, "AssetRule", current_context, "asset_name"):
item_match = False
elif key == "files": # List of FileRule dictionaries
if not self._compare_list_of_rules(actual_value, expected_value, "FileRule", current_context, "file_path"):
item_match = False
else: # Regular field comparison
if actual_value != expected_value:
# Handle None vs "None" string for preset_name specifically if it's a common issue
if key == "preset_name" and actual_value is None and expected_value == "None":
logger.debug(f"Field '{key}' in {item_type_name} ({current_context}): Actual is None, Expected is string \"None\". Treating as match for now.")
elif key == "target_asset_name_override" and actual_value is not None and expected_value is None:
# If actual has a value (e.g. parent asset name) and expected is null/None,
# this is a mismatch according to strict comparison.
# For a more lenient check, this logic could be adjusted here.
# Current strict comparison will flag this as error, which is what the logs show.
logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.")
item_match = False
else:
logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.")
item_match = False
return item_match
def _compare_list_of_rules(self, actual_list: List[Dict[str, Any]], expected_list: List[Dict[str, Any]], item_type_name: str, parent_context: str, item_key_field: str) -> bool:
"""
Compares a list of actual rule items against a list of expected rule items.
Items are matched by a key field (e.g., 'asset_name' or 'file_path').
Order independent for matching, but logs count mismatches.
"""
list_match = True
if not isinstance(actual_list, list) or not isinstance(expected_list, list):
logger.error(f"Type mismatch for list of {item_type_name}s in {parent_context}. Expected lists.")
return False
if len(actual_list) != len(expected_list):
logger.error(f"Mismatch in number of {item_type_name}s for {parent_context}. Actual: {len(actual_list)}, Expected: {len(expected_list)}.")
list_match = False # Count mismatch is an error
# If counts differ, we still try to match what we can to provide more detailed feedback,
# but the overall list_match will remain False.
actual_items_map = {item.get(item_key_field): item for item in actual_list if item.get(item_key_field) is not None}
# Keep track of expected items that found a match to identify missing ones more easily
matched_expected_keys = set()
for expected_item in expected_list:
expected_key_value = expected_item.get(item_key_field)
if expected_key_value is None:
logger.error(f"Expected {item_type_name} in {parent_context} is missing key field '{item_key_field}'. Cannot compare this item: {expected_item}")
list_match = False # This specific expected item cannot be processed
continue
actual_item = actual_items_map.get(expected_key_value)
if actual_item:
matched_expected_keys.add(expected_key_value)
if not self._compare_rule_item(actual_item, expected_item, item_type_name, parent_context):
list_match = False # Individual item comparison failed
else:
logger.error(f"Expected {item_type_name} with {item_key_field} '{expected_key_value}' not found in actual items for {parent_context}.")
list_match = False
# Identify actual items that were not matched by any expected item
# This is useful if len(actual_list) >= len(expected_list) but some actual items are "extra"
for actual_key_value, actual_item_data in actual_items_map.items():
if actual_key_value not in matched_expected_keys:
logger.debug(f"Extra actual {item_type_name} with {item_key_field} '{actual_key_value}' found in {parent_context} (not in expected list or already matched).")
if len(actual_list) != len(expected_list): # If counts already flagged a mismatch, this is just detail
pass
else: # Counts matched, but content didn't align perfectly by key
list_match = False
return list_match
def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool:
"""
Compares the actual rule data (converted from live SourceRule objects)
with the expected rule data (loaded from JSON).
"""
logger.debug("Comparing actual rules with expected rules...")
actual_source_rules = actual_rules_data.get("source_rules", []) if actual_rules_data else []
expected_source_rules = expected_rules_data.get("source_rules", []) if expected_rules_data else []
if not isinstance(actual_source_rules, list):
logger.error(f"Actual 'source_rules' is not a list. Found type: {type(actual_source_rules)}. Comparison aborted.")
return False # Cannot compare if actual data is malformed
if not isinstance(expected_source_rules, list):
logger.error(f"Expected 'source_rules' is not a list. Found type: {type(expected_source_rules)}. Test configuration error. Comparison aborted.")
return False # Test setup error
if not expected_source_rules and not actual_source_rules:
logger.debug("Both expected and actual source rules lists are empty. Considered a match.")
return True
if len(actual_source_rules) != len(expected_source_rules):
logger.error(f"Mismatch in the number of source rules. Actual: {len(actual_source_rules)}, Expected: {len(expected_source_rules)}.")
# Optionally, log more details about which list is longer/shorter or identifiers if available
return False
overall_match_status = True
for i in range(len(expected_source_rules)):
actual_sr = actual_source_rules[i]
expected_sr = expected_source_rules[i]
# For context, use input_path or an index
source_rule_context = expected_sr.get('input_path', f"SourceRule_index_{i}")
if not self._compare_rule_item(actual_sr, expected_sr, "SourceRule", parent_context=source_rule_context):
overall_match_status = False
# Continue checking other source rules to log all discrepancies
if overall_match_status:
logger.debug("All rules match the expected criteria.") # Covered by "Rule comparison successful" summary
else:
logger.warning("One or more rules did not match the expected criteria. See logs above for details.")
return overall_match_status
def _process_and_display_logs(self, logs_text: str) -> None: # logs_text is no longer the primary source for search
"""
Processes and displays logs, potentially filtering them if --search is used.
Also checks for tracebacks.
Sources logs from the in-memory handler for search and detailed analysis.
"""
logger.debug("--- Log Analysis ---")
global autotest_memory_handler # Access the global handler
log_records = []
if autotest_memory_handler and autotest_memory_handler.buffer:
log_records = autotest_memory_handler.buffer
formatted_log_lines = []
# Define a consistent formatter, similar to what might be expected or useful for search
record_formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
# Default asctime format includes milliseconds.
for record in log_records:
formatted_log_lines.append(record_formatter.format(record))
lines_for_search_and_traceback = formatted_log_lines
if not lines_for_search_and_traceback:
logger.warning("No log records found in memory handler. No analysis to perform.")
# Still check the console logs_text for tracebacks if it exists, as a fallback
# or if some critical errors didn't make it to the memory handler (unlikely with DEBUG level)
if logs_text:
logger.debug("Checking provided logs_text (from console) for tracebacks as a fallback.")
console_lines = logs_text.splitlines()
traceback_found_console = False
for i, line in enumerate(console_lines):
if line.strip().startswith("Traceback (most recent call last):"):
logger.error(f"!!! TRACEBACK DETECTED in console logs_text around line {i+1} !!!")
traceback_found_console = True
if traceback_found_console:
logger.warning("A traceback was found in the console logs_text.")
else:
logger.info("No tracebacks found in the console logs_text either.")
logger.info("--- End Log Analysis ---")
return
traceback_found = False
if self.cli_args.search:
logger.info(f"Searching {len(lines_for_search_and_traceback)} in-memory log lines for term '{self.cli_args.search}' with {self.cli_args.additional_lines} context lines.")
matched_line_indices = [i for i, line in enumerate(lines_for_search_and_traceback) if self.cli_args.search in line]
if not matched_line_indices:
logger.info(f"Search term '{self.cli_args.search}' not found in in-memory logs.")
else:
logger.info(f"Found {len(matched_line_indices)} match(es) for '{self.cli_args.search}' in in-memory logs:")
collected_lines_to_print = set()
for match_idx in matched_line_indices:
start_idx = max(0, match_idx - self.cli_args.additional_lines)
end_idx = min(len(lines_for_search_and_traceback), match_idx + self.cli_args.additional_lines + 1)
for i in range(start_idx, end_idx):
# Use i directly as index for lines_for_search_and_traceback, line number is for display
collected_lines_to_print.add(f"L{i+1:05d}: {lines_for_search_and_traceback[i]}")
print("--- Filtered Log Output (from Memory Handler) ---")
for line_to_print in sorted(list(collected_lines_to_print)):
print(line_to_print)
print("--- End Filtered Log Output ---")
# Removed: else block that showed last N lines by default (as per original instruction for this section)
# Traceback Check (on lines_for_search_and_traceback)
for i, line in enumerate(lines_for_search_and_traceback):
if line.strip().startswith("Traceback (most recent call last):") or "Traceback (most recent call last):" in line : # More robust check
logger.error(f"!!! TRACEBACK DETECTED in in-memory logs around line index {i} !!!")
logger.error(f"Line content: {line}")
traceback_found = True
if traceback_found:
logger.warning("A traceback was found in the in-memory logs. This usually indicates a significant issue.")
else:
logger.info("No tracebacks found in the in-memory logs.") # This refers to the comprehensive memory logs
logger.info("--- End Log Analysis ---")
def cleanup_and_exit(self, success: bool = True) -> None:
"""Cleans up and exits the application."""
global autotest_memory_handler
if autotest_memory_handler:
logger.debug("Clearing memory log handler buffer and removing handler.")
autotest_memory_handler.buffer = [] # Clear buffer
logging.getLogger().removeHandler(autotest_memory_handler) # Remove handler
autotest_memory_handler.close() # MemoryHandler close is a no-op but good practice
autotest_memory_handler = None
logger.info(f"Test {'succeeded' if success else 'failed'}. Cleaning up and exiting...") # KEEP INFO - Passes filter
q_app = QCoreApplication.instance()
if q_app:
q_app.quit()
sys.exit(0 if success else 1)
# --- Main Execution ---
def main():
"""Main function to run the autotest script."""
cli_args = parse_arguments()
# Logger is configured above, this will now use the new filtered setup
logger.info(f"Parsed CLI arguments: {cli_args}") # KEEP INFO - Passes filter
# Clean and ensure output directory exists
output_dir_path = Path(cli_args.outputdir)
logger.debug(f"Preparing output directory: {output_dir_path}")
try:
if output_dir_path.exists():
logger.debug(f"Output directory {output_dir_path} exists. Cleaning its contents...")
for item in output_dir_path.iterdir():
if item.is_dir():
shutil.rmtree(item)
logger.debug(f"Removed directory: {item}")
else:
item.unlink()
logger.debug(f"Removed file: {item}")
logger.debug(f"Contents of {output_dir_path} cleaned.")
else:
logger.debug(f"Output directory {output_dir_path} does not exist. Creating it.")
output_dir_path.mkdir(parents=True, exist_ok=True) # Ensure it exists after cleaning/if it didn't exist
logger.debug(f"Output directory {output_dir_path} is ready.")
except Exception as e:
logger.error(f"Could not prepare output directory {output_dir_path}: {e}", exc_info=True)
sys.exit(1)
# Initialize QApplication
# Use QCoreApplication if no GUI elements are directly interacted with by the test logic itself,
# but QApplication is needed if MainWindow or its widgets are constructed and used.
# Since MainWindow is instantiated by App, QApplication is appropriate.
q_app = QApplication.instance()
if not q_app:
q_app = QApplication(sys.argv)
if not q_app: # Still no app
logger.error("Failed to initialize QApplication.")
sys.exit(1)
logger.debug("Initializing main.App()...")
try:
# Instantiate main.App() - this should create MainWindow but not show it by default
# if App is designed to not show GUI unless app.main_window.show() is called.
app_instance = App()
except Exception as e:
logger.error(f"Failed to initialize main.App: {e}", exc_info=True)
sys.exit(1)
if not app_instance.main_window:
logger.error("main.App initialized, but main_window is None. Cannot proceed with test.")
sys.exit(1)
logger.debug("Initializing AutoTester...")
try:
tester = AutoTester(app_instance, cli_args)
except Exception as e:
logger.error(f"Failed to initialize AutoTester: {e}", exc_info=True)
sys.exit(1)
# Use QTimer.singleShot to start the test after the Qt event loop has started.
# This ensures that the Qt environment is fully set up.
logger.debug("Scheduling test run...")
QTimer.singleShot(0, tester.run_test)
logger.debug("Starting Qt application event loop...")
exit_code = q_app.exec()
logger.debug(f"Qt application event loop finished with exit code: {exit_code}")
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@@ -46,7 +46,10 @@
"TEMP_DIR_PREFIX": "_PROCESS_ASSET_", "TEMP_DIR_PREFIX": "_PROCESS_ASSET_",
"INITIAL_SCALING_MODE": "POT_DOWNSCALE", "INITIAL_SCALING_MODE": "POT_DOWNSCALE",
"MERGE_DIMENSION_MISMATCH_STRATEGY": "USE_LARGEST", "MERGE_DIMENSION_MISMATCH_STRATEGY": "USE_LARGEST",
"ENABLE_LOW_RESOLUTION_FALLBACK": true,
"LOW_RESOLUTION_THRESHOLD": 512,
"general_settings": { "general_settings": {
"invert_normal_map_green_channel_globally": false "invert_normal_map_green_channel_globally": false,
"app_version": "Pre-Alpha"
} }
} }

View File

@@ -190,7 +190,7 @@
], ],
"is_grayscale": false, "is_grayscale": false,
"keybind": "E", "keybind": "E",
"standard_type": "" "standard_type": "EXTRA"
}, },
"FILE_IGNORE": { "FILE_IGNORE": {
"bit_depth_rule": "", "bit_depth_rule": "",

View File

@@ -4,6 +4,7 @@ from pathlib import Path
import logging import logging
import re import re
import collections.abc import collections.abc
from typing import Optional
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -12,7 +13,7 @@ APP_SETTINGS_PATH = BASE_DIR / "config" / "app_settings.json"
LLM_SETTINGS_PATH = BASE_DIR / "config" / "llm_settings.json" LLM_SETTINGS_PATH = BASE_DIR / "config" / "llm_settings.json"
ASSET_TYPE_DEFINITIONS_PATH = BASE_DIR / "config" / "asset_type_definitions.json" ASSET_TYPE_DEFINITIONS_PATH = BASE_DIR / "config" / "asset_type_definitions.json"
FILE_TYPE_DEFINITIONS_PATH = BASE_DIR / "config" / "file_type_definitions.json" FILE_TYPE_DEFINITIONS_PATH = BASE_DIR / "config" / "file_type_definitions.json"
USER_SETTINGS_PATH = BASE_DIR / "config" / "user_settings.json" # New path for user settings USER_SETTINGS_PATH = BASE_DIR / "config" / "user_settings.json"
SUPPLIERS_CONFIG_PATH = BASE_DIR / "config" / "suppliers.json" SUPPLIERS_CONFIG_PATH = BASE_DIR / "config" / "suppliers.json"
PRESETS_DIR = BASE_DIR / "Presets" PRESETS_DIR = BASE_DIR / "Presets"
@@ -649,6 +650,24 @@ class Configuration:
"""Returns the LLM request timeout in seconds from LLM settings.""" """Returns the LLM request timeout in seconds from LLM settings."""
return self._llm_settings.get('llm_request_timeout', 120) return self._llm_settings.get('llm_request_timeout', 120)
@property
def app_version(self) -> Optional[str]:
"""Returns the application version from general_settings."""
gs = self._core_settings.get('general_settings')
if isinstance(gs, dict):
return gs.get('app_version')
return None
@property
def enable_low_resolution_fallback(self) -> bool:
"""Gets the setting for enabling low-resolution fallback."""
return self._core_settings.get('ENABLE_LOW_RESOLUTION_FALLBACK', True)
@property
def low_resolution_threshold(self) -> int:
"""Gets the pixel dimension threshold for low-resolution fallback."""
return self._core_settings.get('LOW_RESOLUTION_THRESHOLD', 512)
@property @property
def FILE_TYPE_DEFINITIONS(self) -> dict: def FILE_TYPE_DEFINITIONS(self) -> dict:
return self._file_type_definitions return self._file_type_definitions

Binary file not shown.

BIN
context_portal/context.db Normal file

Binary file not shown.

View File

@@ -126,12 +126,15 @@ class SupplierSearchDelegate(QStyledItemDelegate):
"""Loads the list of known suppliers from the JSON config file.""" """Loads the list of known suppliers from the JSON config file."""
try: try:
with open(SUPPLIERS_CONFIG_PATH, 'r') as f: with open(SUPPLIERS_CONFIG_PATH, 'r') as f:
suppliers = json.load(f) suppliers_data = json.load(f) # Renamed variable for clarity
if isinstance(suppliers, list): if isinstance(suppliers_data, list):
# Ensure all items are strings # Ensure all items are strings
return sorted([str(s) for s in suppliers if isinstance(s, str)]) return sorted([str(s) for s in suppliers_data if isinstance(s, str)])
else: elif isinstance(suppliers_data, dict): # ADDED: Handle dictionary case
log.warning(f"'{SUPPLIERS_CONFIG_PATH}' does not contain a valid list. Starting fresh.") # If it's a dictionary, extract keys as supplier names
return sorted([str(key) for key in suppliers_data.keys() if isinstance(key, str)])
else: # MODIFIED: Updated warning message
log.warning(f"'{SUPPLIERS_CONFIG_PATH}' does not contain a valid list or dictionary of suppliers. Starting fresh.")
return [] return []
except FileNotFoundError: except FileNotFoundError:
log.info(f"'{SUPPLIERS_CONFIG_PATH}' not found. Starting with an empty supplier list.") log.info(f"'{SUPPLIERS_CONFIG_PATH}' not found. Starting with an empty supplier list.")

View File

@@ -20,7 +20,8 @@ script_dir = Path(__file__).parent
project_root = script_dir.parent project_root = script_dir.parent
PRESETS_DIR = project_root / "Presets" PRESETS_DIR = project_root / "Presets"
TEMPLATE_PATH = PRESETS_DIR / "_template.json" TEMPLATE_PATH = PRESETS_DIR / "_template.json"
APP_SETTINGS_PATH_LOCAL = project_root / "config" / "app_settings.json" APP_SETTINGS_PATH_LOCAL = project_root / "config" / "app_settings.json" # Retain for other settings if used elsewhere
FILE_TYPE_DEFINITIONS_PATH = project_root / "config" / "file_type_definitions.json"
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -63,18 +64,19 @@ class PresetEditorWidget(QWidget):
"""Loads FILE_TYPE_DEFINITIONS keys from app_settings.json.""" """Loads FILE_TYPE_DEFINITIONS keys from app_settings.json."""
keys = [] keys = []
try: try:
if APP_SETTINGS_PATH_LOCAL.is_file(): if FILE_TYPE_DEFINITIONS_PATH.is_file():
with open(APP_SETTINGS_PATH_LOCAL, 'r', encoding='utf-8') as f: with open(FILE_TYPE_DEFINITIONS_PATH, 'r', encoding='utf-8') as f:
settings = json.load(f) settings = json.load(f)
# The FILE_TYPE_DEFINITIONS key is at the root of file_type_definitions.json
ftd = settings.get("FILE_TYPE_DEFINITIONS", {}) ftd = settings.get("FILE_TYPE_DEFINITIONS", {})
keys = list(ftd.keys()) keys = list(ftd.keys())
log.debug(f"Successfully loaded {len(keys)} FILE_TYPE_DEFINITIONS keys.") log.debug(f"Successfully loaded {len(keys)} FILE_TYPE_DEFINITIONS keys from {FILE_TYPE_DEFINITIONS_PATH}.")
else: else:
log.error(f"app_settings.json not found at {APP_SETTINGS_PATH_LOCAL} for PresetEditorWidget.") log.error(f"file_type_definitions.json not found at {FILE_TYPE_DEFINITIONS_PATH} for PresetEditorWidget.")
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
log.error(f"Failed to parse app_settings.json in PresetEditorWidget: {e}") log.error(f"Failed to parse file_type_definitions.json in PresetEditorWidget: {e}")
except Exception as e: except Exception as e:
log.error(f"Error loading FILE_TYPE_DEFINITIONS keys in PresetEditorWidget: {e}") log.error(f"Error loading FILE_TYPE_DEFINITIONS keys from {FILE_TYPE_DEFINITIONS_PATH} in PresetEditorWidget: {e}")
return keys return keys
def _init_ui(self): def _init_ui(self):

View File

@@ -552,6 +552,13 @@ class UnifiedViewModel(QAbstractItemModel):
supplier_col_index = self.createIndex(existing_source_row, self.COL_SUPPLIER, existing_source_rule) supplier_col_index = self.createIndex(existing_source_row, self.COL_SUPPLIER, existing_source_rule)
self.dataChanged.emit(supplier_col_index, supplier_col_index, [Qt.DisplayRole, Qt.EditRole]) self.dataChanged.emit(supplier_col_index, supplier_col_index, [Qt.DisplayRole, Qt.EditRole])
# Always update the preset_name from the new_source_rule, as this reflects the latest prediction context
if existing_source_rule.preset_name != new_source_rule.preset_name:
log.debug(f" Updating preset_name for SourceRule '{source_path}' from '{existing_source_rule.preset_name}' to '{new_source_rule.preset_name}'")
existing_source_rule.preset_name = new_source_rule.preset_name
# Note: preset_name is not directly displayed in the view, so no dataChanged needed for a specific column,
# but if it influenced other display elements, dataChanged would be emitted for those.
# --- Merge AssetRules --- # --- Merge AssetRules ---
existing_assets_dict = {asset.asset_name: asset for asset in existing_source_rule.assets} existing_assets_dict = {asset.asset_name: asset for asset in existing_source_rule.assets}

25
main.py
View File

@@ -4,6 +4,7 @@ import time
import os import os
import logging import logging
from pathlib import Path from pathlib import Path
import re # Added for checking incrementing token
from concurrent.futures import ProcessPoolExecutor, as_completed from concurrent.futures import ProcessPoolExecutor, as_completed
import subprocess import subprocess
import shutil import shutil
@@ -238,9 +239,14 @@ class ProcessingTask(QRunnable):
# output_dir should already be a Path object # output_dir should already be a Path object
pattern = getattr(config, 'output_directory_pattern', None) pattern = getattr(config, 'output_directory_pattern', None)
if pattern: if pattern:
log.debug(f"Calculating next incrementing value for dir: {output_dir} using pattern: {pattern}") # Only call get_next_incrementing_value if the pattern contains an incrementing token
next_increment_str = get_next_incrementing_value(output_dir, pattern) if re.search(r"\[IncrementingValue\]|#+", pattern):
log.info(f"Calculated next incrementing value for {output_dir}: {next_increment_str}") log.debug(f"Incrementing token found in pattern '{pattern}'. Calculating next value for dir: {output_dir}")
next_increment_str = get_next_incrementing_value(output_dir, pattern)
log.info(f"Calculated next incrementing value for {output_dir}: {next_increment_str}")
else:
log.debug(f"No incrementing token found in pattern '{pattern}'. Skipping increment calculation.")
next_increment_str = None # Or a default like "00" if downstream expects a string, but None is cleaner if handled.
else: else:
log.warning(f"Cannot calculate incrementing value: 'output_directory_pattern' not found in configuration for preset {config.preset_name}") log.warning(f"Cannot calculate incrementing value: 'output_directory_pattern' not found in configuration for preset {config.preset_name}")
except Exception as e: except Exception as e:
@@ -401,11 +407,13 @@ class App(QObject):
# --- Get paths needed for ProcessingTask --- # --- Get paths needed for ProcessingTask ---
try: try:
# Access output path via MainPanelWidget # Get output_dir from processing_settings passed from autotest.py
output_base_path_str = self.main_window.main_panel_widget.output_path_edit.text().strip() output_base_path_str = processing_settings.get("output_dir")
log.info(f"APP_DEBUG: Received output_dir in processing_settings: {output_base_path_str}")
if not output_base_path_str: if not output_base_path_str:
log.error("Cannot queue tasks: Output directory path is empty in the GUI.") log.error("Cannot queue tasks: Output directory path is empty in processing_settings.")
self.main_window.statusBar().showMessage("Error: Output directory cannot be empty.", 5000) # self.main_window.statusBar().showMessage("Error: Output directory cannot be empty.", 5000) # GUI specific
return return
output_base_path = Path(output_base_path_str) output_base_path = Path(output_base_path_str)
# Basic validation - check if it's likely a valid path structure (doesn't guarantee existence/writability here) # Basic validation - check if it's likely a valid path structure (doesn't guarantee existence/writability here)
@@ -477,8 +485,9 @@ class App(QObject):
engine=task_engine, engine=task_engine,
rule=rule, rule=rule,
workspace_path=workspace_path, workspace_path=workspace_path,
output_base_path=output_base_path output_base_path=output_base_path # This is Path(output_base_path_str)
) )
log.info(f"APP_DEBUG: Passing to ProcessingTask: output_base_path = {output_base_path}")
task.signals.finished.connect(self._on_task_finished) task.signals.finished.connect(self._on_task_finished)
log.debug(f"DEBUG: Calling thread_pool.start() for task {i+1}") log.debug(f"DEBUG: Calling thread_pool.start() for task {i+1}")
self.thread_pool.start(task) self.thread_pool.start(task)

View File

@@ -195,17 +195,25 @@ def _process_archive_task(archive_path: Path, output_dir: Path, processed_dir: P
# Assuming config object has 'output_directory_pattern' attribute/key # Assuming config object has 'output_directory_pattern' attribute/key
pattern = getattr(config, 'output_directory_pattern', None) # Use getattr for safety pattern = getattr(config, 'output_directory_pattern', None) # Use getattr for safety
if pattern: if pattern:
log.debug(f"[Task:{archive_path.name}] Calculating next incrementing value for dir: {output_dir} using pattern: {pattern}") if re.search(r"\[IncrementingValue\]|#+", pattern):
next_increment_str = get_next_incrementing_value(output_dir, pattern) log.debug(f"[Task:{archive_path.name}] Incrementing token found in pattern '{pattern}'. Calculating next value for dir: {output_dir}")
log.info(f"[Task:{archive_path.name}] Calculated next incrementing value: {next_increment_str}") next_increment_str = get_next_incrementing_value(output_dir, pattern)
log.info(f"[Task:{archive_path.name}] Calculated next incrementing value: {next_increment_str}")
else:
log.debug(f"[Task:{archive_path.name}] No incrementing token found in pattern '{pattern}'. Skipping increment calculation.")
next_increment_str = None
else: else:
# Check if config is a dict as fallback (depends on load_config implementation) # Check if config is a dict as fallback (depends on load_config implementation)
if isinstance(config, dict): if isinstance(config, dict):
pattern = config.get('output_directory_pattern') pattern = config.get('output_directory_pattern')
if pattern: if pattern:
log.debug(f"[Task:{archive_path.name}] Calculating next incrementing value for dir: {output_dir} using pattern (from dict): {pattern}") if re.search(r"\[IncrementingValue\]|#+", pattern):
next_increment_str = get_next_incrementing_value(output_dir, pattern) log.debug(f"[Task:{archive_path.name}] Incrementing token found in pattern '{pattern}' (from dict). Calculating next value for dir: {output_dir}")
log.info(f"[Task:{archive_path.name}] Calculated next incrementing value (from dict): {next_increment_str}") next_increment_str = get_next_incrementing_value(output_dir, pattern)
log.info(f"[Task:{archive_path.name}] Calculated next incrementing value (from dict): {next_increment_str}")
else:
log.debug(f"[Task:{archive_path.name}] No incrementing token found in pattern '{pattern}' (from dict). Skipping increment calculation.")
next_increment_str = None
else: else:
log.warning(f"[Task:{archive_path.name}] Cannot calculate incrementing value: 'output_directory_pattern' not found in configuration dictionary.") log.warning(f"[Task:{archive_path.name}] Cannot calculate incrementing value: 'output_directory_pattern' not found in configuration dictionary.")
else: else:

View File

@@ -1,3 +1,4 @@
import dataclasses # Added import
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional
@@ -27,6 +28,7 @@ class ProcessedRegularMapData:
original_bit_depth: Optional[int] original_bit_depth: Optional[int]
original_dimensions: Optional[Tuple[int, int]] # (width, height) original_dimensions: Optional[Tuple[int, int]] # (width, height)
transformations_applied: List[str] transformations_applied: List[str]
resolution_key: Optional[str] = None # Added field
status: str = "Processed" status: str = "Processed"
error_message: Optional[str] = None error_message: Optional[str] = None
@@ -45,9 +47,10 @@ class ProcessedMergedMapData:
@dataclass @dataclass
class InitialScalingInput: class InitialScalingInput:
image_data: np.ndarray image_data: np.ndarray
initial_scaling_mode: str # Moved before fields with defaults
original_dimensions: Optional[Tuple[int, int]] # (width, height) original_dimensions: Optional[Tuple[int, int]] # (width, height)
resolution_key: Optional[str] = None # Added field
# Configuration needed # Configuration needed
initial_scaling_mode: str
# Output for InitialScalingStage # Output for InitialScalingStage
@dataclass @dataclass
@@ -55,6 +58,7 @@ class InitialScalingOutput:
scaled_image_data: np.ndarray scaled_image_data: np.ndarray
scaling_applied: bool scaling_applied: bool
final_dimensions: Tuple[int, int] # (width, height) final_dimensions: Tuple[int, int] # (width, height)
resolution_key: Optional[str] = None # Added field
# Input for SaveVariantsStage # Input for SaveVariantsStage
@dataclass @dataclass

View File

@@ -8,7 +8,7 @@ from typing import List, Dict, Optional, Any, Union # Added Any, Union
import numpy as np # Added numpy import numpy as np # Added numpy
from configuration import Configuration from configuration import Configuration
from rule_structure import SourceRule, AssetRule, FileRule # Added FileRule from rule_structure import SourceRule, AssetRule, FileRule, ProcessingItem # Added ProcessingItem
# Import new context classes and stages # Import new context classes and stages
from .asset_context import ( from .asset_context import (
@@ -200,145 +200,224 @@ class PipelineOrchestrator:
current_image_data: Optional[np.ndarray] = None # Track current image data ref current_image_data: Optional[np.ndarray] = None # Track current image data ref
try: try:
# 1. Process (Load/Merge + Transform) # The 'item' is now expected to be a ProcessingItem or MergeTaskDefinition
if isinstance(item, FileRule):
if item.item_type == 'EXTRA': if isinstance(item, ProcessingItem):
log.debug(f"{item_log_prefix}: Skipping image processing for EXTRA FileRule '{item.file_path}'.") item_key = f"{item.source_file_info_ref}_{item.map_type_identifier}_{item.resolution_key}"
# Add a basic entry to processed_maps_details to acknowledge it was seen item_log_prefix = f"Asset '{asset_name}', ProcItem '{item_key}'"
context.processed_maps_details[item.file_path] = { log.info(f"{item_log_prefix}: Starting processing.")
"status": "Skipped (EXTRA file)",
"internal_map_type": "EXTRA",
"source_file": str(item.file_path)
}
continue # Skip to the next item
item_key = item.file_path # Use file_path string as key
log.debug(f"{item_log_prefix}: Processing FileRule '{item.file_path}'...")
processed_data = self._regular_processor_stage.execute(context, item)
elif isinstance(item, MergeTaskDefinition):
item_key = item.task_key # Use task_key string as key
log.info(f"{item_log_prefix}: Executing MergedTaskProcessorStage for MergeTask '{item_key}'...") # Log call
processed_data = self._merged_processor_stage.execute(context, item)
# Log status/error from merge processor
if processed_data:
log.info(f"{item_log_prefix}: MergedTaskProcessorStage result - Status: {processed_data.status}, Error: {processed_data.error_message}")
else:
log.warning(f"{item_log_prefix}: MergedTaskProcessorStage returned None for MergeTask '{item_key}'.")
else:
log.warning(f"{item_log_prefix}: Unknown item type '{type(item)}'. Skipping.")
item_key = f"unknown_item_{item_index}"
context.processed_maps_details[item_key] = {"status": "Skipped", "notes": f"Unknown item type {type(item)}"}
asset_had_item_errors = True
continue # Next item
# Check for processing failure # Data for ProcessingItem is already loaded by PrepareProcessingItemsStage
if not processed_data or processed_data.status != "Processed": current_image_data = item.image_data
error_msg = processed_data.error_message if processed_data else "Processor returned None" current_dimensions = item.current_dimensions
log.error(f"{item_log_prefix}: Failed during processing stage. Error: {error_msg}") item_resolution_key = item.resolution_key
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Processing Error: {error_msg}", "stage": processed_data.__class__.__name__ if processed_data else "UnknownProcessor"}
asset_had_item_errors = True # Transformations (like gloss to rough, normal invert) are assumed to be applied
continue # Next item # by RegularMapProcessorStage if it's still used, or directly in PrepareProcessingItemsStage
# before creating the ProcessingItem, or a new dedicated transformation stage.
# For now, assume item.image_data is ready for scaling/saving.
# Store initial ProcessingItem data as "processed_data" for consistency if RegularMapProcessor is bypassed
# This is a simplification; a dedicated transformation stage would be cleaner.
# For now, we assume transformations happened before or within PrepareProcessingItemsStage.
# The 'processed_data' variable here is more of a placeholder for what would feed into scaling.
# Create a simple ProcessedRegularMapData-like structure for logging/details if needed,
# or adapt the final_details population later.
# For now, we'll directly use 'item' fields.
# Store intermediate result & get current image data # 2. Scale (Optional)
context.intermediate_results[item_key] = processed_data scaling_mode = getattr(context.config_obj, "INITIAL_SCALING_MODE", "NONE")
current_image_data = processed_data.processed_image_data if isinstance(processed_data, ProcessedRegularMapData) else processed_data.merged_image_data # Pass the item's resolution_key to InitialScalingInput
current_dimensions = processed_data.original_dimensions if isinstance(processed_data, ProcessedRegularMapData) else processed_data.final_dimensions
# 2. Scale (Optional)
scaling_mode = getattr(context.config_obj, "INITIAL_SCALING_MODE", "NONE")
if scaling_mode != "NONE" and current_image_data is not None and current_image_data.size > 0:
if isinstance(item, MergeTaskDefinition): # Log scaling call for merge tasks
log.info(f"{item_log_prefix}: Calling InitialScalingStage for MergeTask '{item_key}' (Mode: {scaling_mode})...")
log.debug(f"{item_log_prefix}: Applying initial scaling (Mode: {scaling_mode})...")
scale_input = InitialScalingInput( scale_input = InitialScalingInput(
image_data=current_image_data, image_data=current_image_data,
original_dimensions=current_dimensions, # Pass original/merged dims original_dimensions=current_dimensions,
initial_scaling_mode=scaling_mode initial_scaling_mode=scaling_mode,
resolution_key=item_resolution_key # Pass the key
) )
# Add _source_file_path for logging within InitialScalingStage if available
setattr(scale_input, '_source_file_path', item.source_file_info_ref)
log.debug(f"{item_log_prefix}: Calling InitialScalingStage. Input res_key: {scale_input.resolution_key}")
scaled_data_output = self._scaling_stage.execute(scale_input) scaled_data_output = self._scaling_stage.execute(scale_input)
# Update intermediate result and current image data reference current_image_data = scaled_data_output.scaled_image_data
context.intermediate_results[item_key] = scaled_data_output # Overwrite previous intermediate current_dimensions = scaled_data_output.final_dimensions # Dimensions after scaling
current_image_data = scaled_data_output.scaled_image_data # Use scaled data for saving # The resolution_key from item is passed through by InitialScalingOutput
log.debug(f"{item_log_prefix}: Scaling applied: {scaled_data_output.scaling_applied}. New Dims: {scaled_data_output.final_dimensions}") output_resolution_key = scaled_data_output.resolution_key
else: log.debug(f"{item_log_prefix}: InitialScalingStage output. Scaled: {scaled_data_output.scaling_applied}, New Dims: {current_dimensions}, Output ResKey: {output_resolution_key}")
log.debug(f"{item_log_prefix}: Initial scaling skipped (Mode: NONE or empty image).") context.intermediate_results[item_key] = scaled_data_output
# Create dummy output if scaling skipped, using current dims
final_dims = current_dimensions if current_dimensions else (current_image_data.shape[1], current_image_data.shape[0]) if current_image_data is not None else (0,0)
scaled_data_output = InitialScalingOutput(scaled_image_data=current_image_data, scaling_applied=False, final_dimensions=final_dims)
# 3. Save Variants # 3. Save Variants
if current_image_data is None or current_image_data.size == 0: if current_image_data is None or current_image_data.size == 0:
log.warning(f"{item_log_prefix}: Skipping save stage because image data is empty.") log.warning(f"{item_log_prefix}: Skipping save stage because image data is empty.")
context.processed_maps_details[item_key] = {"status": "Skipped", "notes": "No image data to save", "stage": "SaveVariantsStage"} context.processed_maps_details[item_key] = {"status": "Skipped", "notes": "No image data to save", "stage": "SaveVariantsStage"}
# Don't mark as asset error, just skip this item's saving continue
continue # Next item
if isinstance(item, MergeTaskDefinition): # Log save call for merge tasks log.debug(f"{item_log_prefix}: Preparing to save variant with resolution key '{output_resolution_key}'...")
log.info(f"{item_log_prefix}: Calling SaveVariantsStage for MergeTask '{item_key}'...")
log.debug(f"{item_log_prefix}: Saving variants...") output_filename_tokens = {
# Prepare input for save stage 'asset_name': asset_name,
internal_map_type = processed_data.final_internal_map_type if isinstance(processed_data, ProcessedRegularMapData) else processed_data.output_map_type 'output_base_directory': context.engine_temp_dir,
source_bit_depth = [processed_data.original_bit_depth] if isinstance(processed_data, ProcessedRegularMapData) and processed_data.original_bit_depth is not None else processed_data.source_bit_depths if isinstance(processed_data, ProcessedMergedMapData) else [8] # Default bit depth if unknown 'supplier': context.effective_supplier or 'UnknownSupplier',
'resolution': output_resolution_key # Use the key from the item/scaling stage
# Construct filename tokens (ensure temp dir is used)
output_filename_tokens = {
'asset_name': asset_name,
'output_base_directory': context.engine_temp_dir, # Save variants to temp dir
# Add other tokens from context/config as needed by the pattern
'supplier': context.effective_supplier or 'UnknownSupplier',
}
# Log the value being read for the threshold before creating the input object
log.info(f"ORCHESTRATOR_DEBUG: Reading RESOLUTION_THRESHOLD_FOR_JPG from config for SaveVariantsInput: {getattr(context.config_obj, 'RESOLUTION_THRESHOLD_FOR_JPG', None)}")
save_input = SaveVariantsInput(
image_data=current_image_data, # Use potentially scaled data
internal_map_type=internal_map_type,
source_bit_depth_info=source_bit_depth,
output_filename_pattern_tokens=output_filename_tokens,
# Pass config values needed by save stage
image_resolutions=context.config_obj.image_resolutions,
file_type_defs=getattr(context.config_obj, "FILE_TYPE_DEFINITIONS", {}),
output_format_8bit=context.config_obj.get_8bit_output_format(),
output_format_16bit_primary=context.config_obj.get_16bit_output_formats()[0],
output_format_16bit_fallback=context.config_obj.get_16bit_output_formats()[1],
png_compression_level=context.config_obj.png_compression_level,
jpg_quality=context.config_obj.jpg_quality,
output_filename_pattern=context.config_obj.output_filename_pattern,
resolution_threshold_for_jpg=getattr(context.config_obj, "resolution_threshold_for_jpg", None) # Corrected case
)
saved_data = self._save_stage.execute(save_input)
# Log saved_data for merge tasks
if isinstance(item, MergeTaskDefinition):
log.info(f"{item_log_prefix}: SaveVariantsStage result for MergeTask '{item_key}' - Status: {saved_data.status if saved_data else 'N/A'}, Saved Files: {len(saved_data.saved_files_details) if saved_data else 0}")
# Check save status and finalize item result
if saved_data and saved_data.status.startswith("Processed"):
item_status = saved_data.status # e.g., "Processed" or "Processed (No Output)"
log.info(f"{item_log_prefix}: Item successfully processed and saved. Status: {item_status}")
# Populate final details for this item
final_details = {
"status": item_status,
"saved_files_info": saved_data.saved_files_details, # List of dicts from save util
"internal_map_type": internal_map_type,
"original_dimensions": processed_data.original_dimensions if isinstance(processed_data, ProcessedRegularMapData) else None,
"final_dimensions": scaled_data_output.final_dimensions if scaled_data_output else current_dimensions,
"transformations": processed_data.transformations_applied if isinstance(processed_data, ProcessedRegularMapData) else processed_data.transformations_applied_to_inputs,
# Add source file if regular map
"source_file": str(processed_data.source_file_path) if isinstance(processed_data, ProcessedRegularMapData) else None,
} }
# Log final details addition for merge tasks
if isinstance(item, MergeTaskDefinition): # Determine image_resolutions argument for save_image_variants
log.info(f"{item_log_prefix}: Adding final details to context.processed_maps_details for MergeTask '{item_key}'. Details: {final_details}") save_specific_resolutions = {}
context.processed_maps_details[item_key] = final_details if output_resolution_key == "LOWRES":
# For LOWRES, the "resolution value" is its actual dimension.
# image_saving_utils needs a dict like {"LOWRES": 64} if current_dim is 64x64
# Assuming current_dimensions[0] is width.
save_specific_resolutions = {"LOWRES": current_dimensions[0] if current_dimensions else 0}
log.debug(f"{item_log_prefix}: Preparing to save LOWRES variant. Dimensions: {current_dimensions}. Save resolutions arg: {save_specific_resolutions}")
elif output_resolution_key in context.config_obj.image_resolutions:
save_specific_resolutions = {output_resolution_key: context.config_obj.image_resolutions[output_resolution_key]}
else:
log.warning(f"{item_log_prefix}: Resolution key '{output_resolution_key}' not found in config.image_resolutions and not LOWRES. Saving might fail or use full res.")
# Fallback: pass all configured resolutions, image_saving_utils will try to match by size.
# This might not be ideal if the key is truly unknown.
# Or, more strictly, fail here if key is unknown and not LOWRES.
# For now, let image_saving_utils handle it by passing all.
save_specific_resolutions = context.config_obj.image_resolutions
save_input = SaveVariantsInput(
image_data=current_image_data,
internal_map_type=item.map_type_identifier,
source_bit_depth_info=[item.bit_depth] if item.bit_depth is not None else [8], # Default to 8 if not set
output_filename_pattern_tokens=output_filename_tokens,
image_resolutions=save_specific_resolutions, # Pass the specific resolution(s)
file_type_defs=getattr(context.config_obj, "FILE_TYPE_DEFINITIONS", {}),
output_format_8bit=context.config_obj.get_8bit_output_format(),
output_format_16bit_primary=context.config_obj.get_16bit_output_formats()[0],
output_format_16bit_fallback=context.config_obj.get_16bit_output_formats()[1],
png_compression_level=context.config_obj.png_compression_level,
jpg_quality=context.config_obj.jpg_quality,
output_filename_pattern=context.config_obj.output_filename_pattern,
resolution_threshold_for_jpg=getattr(context.config_obj, "resolution_threshold_for_jpg", None)
)
saved_data = self._save_stage.execute(save_input)
if saved_data and saved_data.status.startswith("Processed"):
item_status = saved_data.status
log.info(f"{item_log_prefix}: Item successfully processed and saved. Status: {item_status}")
context.processed_maps_details[item_key] = {
"status": item_status,
"saved_files_info": saved_data.saved_files_details,
"internal_map_type": item.map_type_identifier,
"resolution_key": output_resolution_key,
"original_dimensions": item.original_dimensions,
"final_dimensions": current_dimensions, # Dimensions after scaling
"source_file": item.source_file_info_ref,
}
else:
error_msg = saved_data.error_message if saved_data else "Save stage returned None"
log.error(f"{item_log_prefix}: Failed during save stage. Error: {error_msg}")
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Save Error: {error_msg}", "stage": "SaveVariantsStage"}
asset_had_item_errors = True
item_status = "Failed"
elif isinstance(item, MergeTaskDefinition):
# --- This part needs similar refactoring for resolution_key if merged outputs can be LOWRES ---
# --- For now, assume merged tasks always produce standard resolutions ---
item_key = item.task_key
item_log_prefix = f"Asset '{asset_name}', MergeTask '{item_key}'"
log.info(f"{item_log_prefix}: Processing MergeTask.")
# 1. Process Merge Task
processed_data = self._merged_processor_stage.execute(context, item)
if not processed_data or processed_data.status != "Processed":
error_msg = processed_data.error_message if processed_data else "Merge processor returned None"
log.error(f"{item_log_prefix}: Failed during merge processing. Error: {error_msg}")
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Merge Error: {error_msg}", "stage": "MergedTaskProcessorStage"}
asset_had_item_errors = True
continue
context.intermediate_results[item_key] = processed_data
current_image_data = processed_data.merged_image_data
current_dimensions = processed_data.final_dimensions
# 2. Scale Merged Output (Optional)
# Merged tasks typically don't have a single "resolution_key" like LOWRES from source.
# They produce an image that then gets downscaled to 1K, PREVIEW etc.
# So, resolution_key for InitialScalingInput here would be None or a default.
scaling_mode = getattr(context.config_obj, "INITIAL_SCALING_MODE", "NONE")
scale_input = InitialScalingInput(
image_data=current_image_data,
original_dimensions=current_dimensions,
initial_scaling_mode=scaling_mode,
resolution_key=None # Merged outputs are not "LOWRES" themselves before this scaling
)
setattr(scale_input, '_source_file_path', f"MergeTask_{item_key}") # For logging
log.debug(f"{item_log_prefix}: Calling InitialScalingStage for merged data.")
scaled_data_output = self._scaling_stage.execute(scale_input)
current_image_data = scaled_data_output.scaled_image_data
current_dimensions = scaled_data_output.final_dimensions
# Merged items don't have a specific output_resolution_key from source,
# they will be saved to all applicable resolutions from config.
# So scaled_data_output.resolution_key will be None here.
context.intermediate_results[item_key] = scaled_data_output
# 3. Save Merged Variants
if current_image_data is None or current_image_data.size == 0:
log.warning(f"{item_log_prefix}: Skipping save for merged task, image data is empty.")
context.processed_maps_details[item_key] = {"status": "Skipped", "notes": "No merged image data to save", "stage": "SaveVariantsStage"}
continue
output_filename_tokens = {
'asset_name': asset_name,
'output_base_directory': context.engine_temp_dir,
'supplier': context.effective_supplier or 'UnknownSupplier',
# 'resolution' token will be filled by image_saving_utils for each variant
}
# For merged tasks, we usually want to generate all standard resolutions.
# The `resolution_key` from the item itself is not applicable here for the `resolution` token.
# The `image_saving_utils.save_image_variants` will iterate through `context.config_obj.image_resolutions`.
save_input = SaveVariantsInput(
image_data=current_image_data,
internal_map_type=processed_data.output_map_type,
source_bit_depth_info=processed_data.source_bit_depths,
output_filename_pattern_tokens=output_filename_tokens,
image_resolutions=context.config_obj.image_resolutions, # Pass all configured resolutions
file_type_defs=getattr(context.config_obj, "FILE_TYPE_DEFINITIONS", {}),
output_format_8bit=context.config_obj.get_8bit_output_format(),
output_format_16bit_primary=context.config_obj.get_16bit_output_formats()[0],
output_format_16bit_fallback=context.config_obj.get_16bit_output_formats()[1],
png_compression_level=context.config_obj.png_compression_level,
jpg_quality=context.config_obj.jpg_quality,
output_filename_pattern=context.config_obj.output_filename_pattern,
resolution_threshold_for_jpg=getattr(context.config_obj, "resolution_threshold_for_jpg", None)
)
saved_data = self._save_stage.execute(save_input)
if saved_data and saved_data.status.startswith("Processed"):
item_status = saved_data.status
log.info(f"{item_log_prefix}: Merged task successfully processed and saved. Status: {item_status}")
context.processed_maps_details[item_key] = {
"status": item_status,
"saved_files_info": saved_data.saved_files_details,
"internal_map_type": processed_data.output_map_type,
"final_dimensions": current_dimensions,
}
else:
error_msg = saved_data.error_message if saved_data else "Save stage for merged task returned None"
log.error(f"{item_log_prefix}: Failed during save stage for merged task. Error: {error_msg}")
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Save Error (Merged): {error_msg}", "stage": "SaveVariantsStage"}
asset_had_item_errors = True
item_status = "Failed"
else: else:
error_msg = saved_data.error_message if saved_data else "Save stage returned None" log.warning(f"{item_log_prefix}: Unknown item type in loop: {type(item)}. Skipping.")
log.error(f"{item_log_prefix}: Failed during save stage. Error: {error_msg}") # Ensure some key exists to prevent KeyError if item_key was not set
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Save Error: {error_msg}", "stage": "SaveVariantsStage"} unknown_item_key = f"unknown_item_at_index_{item_index}"
context.processed_maps_details[unknown_item_key] = {"status": "Skipped", "notes": f"Unknown item type {type(item)}"}
asset_had_item_errors = True asset_had_item_errors = True
item_status = "Failed" # Ensure item status reflects failure continue
except Exception as e: except Exception as e:
log.exception(f"{item_log_prefix}: Unhandled exception during item processing loop: {e}") log.exception(f"Asset '{asset_name}', Item Loop Index {item_index}: Unhandled exception: {e}")
# Ensure details are recorded even on unhandled exception # Ensure details are recorded even on unhandled exception
if item_key is not None: if item_key is not None:
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Unhandled Loop Error: {e}", "stage": "OrchestratorLoop"} context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Unhandled Loop Error: {e}", "stage": "OrchestratorLoop"}

View File

@@ -1,5 +1,5 @@
import logging import logging
from typing import Tuple from typing import Tuple, Optional # Added Optional
import cv2 # Assuming cv2 is available for interpolation flags import cv2 # Assuming cv2 is available for interpolation flags
import numpy as np import numpy as np
@@ -7,77 +7,93 @@ import numpy as np
from .base_stage import ProcessingStage from .base_stage import ProcessingStage
# Import necessary context classes and utils # Import necessary context classes and utils
from ..asset_context import InitialScalingInput, InitialScalingOutput from ..asset_context import InitialScalingInput, InitialScalingOutput
# ProcessingItem is no longer created here, so its import can be removed if not used otherwise.
# For now, keep rule_structure import if other elements from it might be needed,
# but ProcessingItem itself is not directly instantiated by this stage anymore.
# from rule_structure import ProcessingItem
from ...utils import image_processing_utils as ipu from ...utils import image_processing_utils as ipu
import numpy as np
import cv2 # Added cv2 for interpolation flags (already used implicitly by ipu.resize_image)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class InitialScalingStage(ProcessingStage): class InitialScalingStage(ProcessingStage):
""" """
Applies initial scaling (e.g., Power-of-Two downscaling) to image data Applies initial Power-of-Two (POT) downscaling to image data if configured
if configured via the InitialScalingInput. and if the item is not already a 'LOWRES' variant.
""" """
def execute(self, input_data: InitialScalingInput) -> InitialScalingOutput: def execute(self, input_data: InitialScalingInput) -> InitialScalingOutput:
""" """
Applies scaling based on input_data.initial_scaling_mode. Applies POT scaling based on input_data.initial_scaling_mode,
unless input_data.resolution_key is 'LOWRES'.
Passes through the resolution_key.
""" """
log.debug(f"Initial Scaling Stage: Mode '{input_data.initial_scaling_mode}'.") # Safely access source_file_path for logging, if provided by orchestrator via underscore attribute
source_file_path = getattr(input_data, '_source_file_path', "UnknownSourcePath")
log_prefix = f"InitialScalingStage (Source: {source_file_path}, ResKey: {input_data.resolution_key})"
log.debug(f"{log_prefix}: Mode '{input_data.initial_scaling_mode}'. Received resolution_key: '{input_data.resolution_key}'")
image_to_scale = input_data.image_data image_to_scale = input_data.image_data
original_dims_wh = input_data.original_dimensions current_dimensions_wh = input_data.original_dimensions # Dimensions of the image_to_scale
scaling_mode = input_data.initial_scaling_mode scaling_mode = input_data.initial_scaling_mode
scaling_applied = False
final_image_data = image_to_scale # Default to original if no scaling happens output_resolution_key = input_data.resolution_key # Pass through the resolution key
if image_to_scale is None or image_to_scale.size == 0: if image_to_scale is None or image_to_scale.size == 0:
log.warning("Initial Scaling Stage: Input image data is None or empty. Skipping.") log.warning(f"{log_prefix}: Input image data is None or empty. Skipping POT scaling.")
# Return original (empty) data and indicate no scaling
return InitialScalingOutput( return InitialScalingOutput(
scaled_image_data=np.array([]), scaled_image_data=np.array([]),
scaling_applied=False, scaling_applied=False,
final_dimensions=(0, 0) final_dimensions=(0, 0),
resolution_key=output_resolution_key
) )
if original_dims_wh is None: if not current_dimensions_wh:
log.warning("Initial Scaling Stage: Original dimensions not provided. Using current image shape.") log.warning(f"{log_prefix}: Original dimensions not provided for POT scaling. Using current image shape.")
h_pre_scale, w_pre_scale = image_to_scale.shape[:2] h_pre_pot_scale, w_pre_pot_scale = image_to_scale.shape[:2]
original_dims_wh = (w_pre_scale, h_pre_scale)
else: else:
w_pre_scale, h_pre_scale = original_dims_wh w_pre_pot_scale, h_pre_pot_scale = current_dimensions_wh
final_image_data = image_to_scale # Default to original if no scaling happens
scaling_applied = False
# Skip POT scaling if the item is already a LOWRES variant or scaling mode is NONE
if output_resolution_key == "LOWRES":
log.info(f"{log_prefix}: Item is a 'LOWRES' variant. Skipping POT downscaling.")
elif scaling_mode == "NONE":
log.info(f"{log_prefix}: Mode is NONE. No POT scaling applied.")
elif scaling_mode == "POT_DOWNSCALE":
pot_w = ipu.get_nearest_power_of_two_downscale(w_pre_pot_scale)
pot_h = ipu.get_nearest_power_of_two_downscale(h_pre_pot_scale)
if scaling_mode == "POT_DOWNSCALE": if (pot_w, pot_h) != (w_pre_pot_scale, h_pre_pot_scale):
pot_w = ipu.get_nearest_power_of_two_downscale(w_pre_scale) log.info(f"{log_prefix}: Applying POT Downscale from ({w_pre_pot_scale},{h_pre_pot_scale}) to ({pot_w},{pot_h}).")
pot_h = ipu.get_nearest_power_of_two_downscale(h_pre_scale)
if (pot_w, pot_h) != (w_pre_scale, h_pre_scale):
log.info(f"Initial Scaling: Applying POT Downscale from ({w_pre_scale},{h_pre_scale}) to ({pot_w},{pot_h}).")
# Use INTER_AREA for downscaling generally
resized_img = ipu.resize_image(image_to_scale, pot_w, pot_h, interpolation=cv2.INTER_AREA) resized_img = ipu.resize_image(image_to_scale, pot_w, pot_h, interpolation=cv2.INTER_AREA)
if resized_img is not None: if resized_img is not None:
final_image_data = resized_img final_image_data = resized_img
scaling_applied = True scaling_applied = True
log.debug("Initial Scaling: POT Downscale applied successfully.") log.debug(f"{log_prefix}: POT Downscale applied successfully.")
else: else:
log.warning("Initial Scaling: POT Downscale resize failed. Using original data.") log.warning(f"{log_prefix}: POT Downscale resize failed. Using pre-POT-scaled data.")
# final_image_data remains image_to_scale
else: else:
log.info("Initial Scaling: POT Downscale - Image already POT or smaller. No scaling needed.") log.info(f"{log_prefix}: Image already POT or smaller. No POT scaling needed.")
# final_image_data remains image_to_scale
elif scaling_mode == "NONE":
log.info("Initial Scaling: Mode is NONE. No scaling applied.")
# final_image_data remains image_to_scale
else: else:
log.warning(f"Initial Scaling: Unknown INITIAL_SCALING_MODE '{scaling_mode}'. Defaulting to NONE.") log.warning(f"{log_prefix}: Unknown INITIAL_SCALING_MODE '{scaling_mode}'. Defaulting to NONE (no scaling).")
# final_image_data remains image_to_scale
# Determine final dimensions # Determine final dimensions
final_h, final_w = final_image_data.shape[:2] if final_image_data is not None and final_image_data.size > 0:
final_dims_wh = (final_w, final_h) final_h, final_w = final_image_data.shape[:2]
final_dims_wh = (final_w, final_h)
else:
final_dims_wh = (0,0)
if final_image_data is None: # Ensure it's an empty array for consistency if None
final_image_data = np.array([])
return InitialScalingOutput( return InitialScalingOutput(
scaled_image_data=final_image_data, scaled_image_data=final_image_data,
scaling_applied=scaling_applied, scaling_applied=scaling_applied,
final_dimensions=final_dims_wh final_dimensions=final_dims_wh,
resolution_key=output_resolution_key # Pass through the resolution key
) )

View File

@@ -85,6 +85,7 @@ class MetadataInitializationStage(ProcessingStage):
merged_maps_details. merged_maps_details.
""" """
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext: def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
logger.debug(f"METADATA_INIT_DEBUG: Entry - context.output_base_path = {context.output_base_path}") # Added
""" """
Executes the metadata initialization logic. Executes the metadata initialization logic.
@@ -147,12 +148,15 @@ class MetadataInitializationStage(ProcessingStage):
context.asset_metadata['processing_start_time'] = datetime.datetime.now().isoformat() context.asset_metadata['processing_start_time'] = datetime.datetime.now().isoformat()
context.asset_metadata['status'] = "Pending" context.asset_metadata['status'] = "Pending"
if context.config_obj and hasattr(context.config_obj, 'general_settings') and \ app_version_value = None
hasattr(context.config_obj.general_settings, 'app_version'): if context.config_obj and hasattr(context.config_obj, 'app_version'):
context.asset_metadata['version'] = context.config_obj.general_settings.app_version app_version_value = context.config_obj.app_version
if app_version_value:
context.asset_metadata['version'] = app_version_value
else: else:
logger.warning("App version not found in config_obj.general_settings. Setting version to 'N/A'.") logger.warning("App version not found using config_obj.app_version. Setting version to 'N/A'.")
context.asset_metadata['version'] = "N/A" # Default or placeholder context.asset_metadata['version'] = "N/A"
if context.incrementing_value is not None: if context.incrementing_value is not None:
context.asset_metadata['incrementing_value'] = context.incrementing_value context.asset_metadata['incrementing_value'] = context.incrementing_value
@@ -170,4 +174,5 @@ class MetadataInitializationStage(ProcessingStage):
# Example of how you might log the full metadata for debugging: # Example of how you might log the full metadata for debugging:
# logger.debug(f"Initialized metadata: {context.asset_metadata}") # logger.debug(f"Initialized metadata: {context.asset_metadata}")
logger.debug(f"METADATA_INIT_DEBUG: Exit - context.output_base_path = {context.output_base_path}") # Added
return context return context

View File

@@ -17,8 +17,16 @@ class OutputOrganizationStage(ProcessingStage):
""" """
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext: def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
log.info("OUTPUT_ORG: Stage execution started for asset '%s'", context.asset_rule.asset_name) asset_name_for_log_early = context.asset_rule.asset_name if hasattr(context, 'asset_rule') and context.asset_rule else "Unknown Asset (early)"
log.info(f"OUTPUT_ORG: context.processed_maps_details at start: {context.processed_maps_details}") log.info(f"OUTPUT_ORG_DEBUG: Stage execution started for asset '{asset_name_for_log_early}'.")
logger.debug(f"OUTPUT_ORG_DEBUG: Entry - context.output_base_path = {context.output_base_path}") # Modified
log.info(f"OUTPUT_ORG_DEBUG: Received context.config_obj.output_directory_base (raw from config) = {getattr(context.config_obj, 'output_directory_base', 'N/A')}")
# resolved_base = "N/A"
# if hasattr(context.config_obj, '_settings') and context.config_obj._settings.get('OUTPUT_BASE_DIR'):
# base_dir_from_settings = context.config_obj._settings.get('OUTPUT_BASE_DIR')
# Path resolution logic might be complex
# log.info(f"OUTPUT_ORG_DEBUG: Received context.config_obj._settings.OUTPUT_BASE_DIR (resolved guess) = {resolved_base}")
log.info(f"OUTPUT_ORG_DEBUG: context.processed_maps_details at start: {context.processed_maps_details}")
""" """
Copies temporary processed and merged files to their final output locations Copies temporary processed and merged files to their final output locations
based on path patterns and updates AssetProcessingContext. based on path patterns and updates AssetProcessingContext.
@@ -103,7 +111,9 @@ class OutputOrganizationStage(ProcessingStage):
pattern_string=output_dir_pattern, pattern_string=output_dir_pattern,
token_data=token_data_variant_cleaned token_data=token_data_variant_cleaned
) )
logger.debug(f"OUTPUT_ORG_DEBUG: Variants - Using context.output_base_path = {context.output_base_path} for final_variant_path construction.") # Added
final_variant_path = Path(context.output_base_path) / Path(relative_dir_path_str_variant) / Path(output_filename_variant) final_variant_path = Path(context.output_base_path) / Path(relative_dir_path_str_variant) / Path(output_filename_variant)
logger.debug(f"OUTPUT_ORG_DEBUG: Variants - Constructed final_variant_path = {final_variant_path}") # Added
final_variant_path.parent.mkdir(parents=True, exist_ok=True) final_variant_path.parent.mkdir(parents=True, exist_ok=True)
if final_variant_path.exists() and not overwrite_existing: if final_variant_path.exists() and not overwrite_existing:
@@ -169,7 +179,9 @@ class OutputOrganizationStage(ProcessingStage):
pattern_string=output_dir_pattern, pattern_string=output_dir_pattern,
token_data=token_data_cleaned token_data=token_data_cleaned
) )
logger.debug(f"OUTPUT_ORG_DEBUG: SingleFile - Using context.output_base_path = {context.output_base_path} for final_path construction.") # Added
final_path = Path(context.output_base_path) / Path(relative_dir_path_str) / Path(output_filename) final_path = Path(context.output_base_path) / Path(relative_dir_path_str) / Path(output_filename)
logger.debug(f"OUTPUT_ORG_DEBUG: SingleFile - Constructed final_path = {final_path}") # Added
final_path.parent.mkdir(parents=True, exist_ok=True) final_path.parent.mkdir(parents=True, exist_ok=True)
if final_path.exists() and not overwrite_existing: if final_path.exists() and not overwrite_existing:
@@ -245,10 +257,12 @@ class OutputOrganizationStage(ProcessingStage):
token_data=base_token_data_cleaned token_data=base_token_data_cleaned
) )
# Destination: <output_base_path>/<asset_base_output_dir_str>/<extra_subdir_name>/<original_filename> # Destination: <output_base_path>/<asset_base_output_dir_str>/<extra_subdir_name>/<original_filename>
logger.debug(f"OUTPUT_ORG_DEBUG: ExtraFiles - Using context.output_base_path = {context.output_base_path} for final_dest_path construction.") # Added
final_dest_path = (Path(context.output_base_path) / final_dest_path = (Path(context.output_base_path) /
Path(asset_base_output_dir_str) / Path(asset_base_output_dir_str) /
Path(extra_subdir_name) / Path(extra_subdir_name) /
source_file_path.name) # Use original filename source_file_path.name) # Use original filename
logger.debug(f"OUTPUT_ORG_DEBUG: ExtraFiles - Constructed final_dest_path = {final_dest_path}") # Added
final_dest_path.parent.mkdir(parents=True, exist_ok=True) final_dest_path.parent.mkdir(parents=True, exist_ok=True)

View File

@@ -1,21 +1,69 @@
import logging import logging
from typing import List, Union, Optional from typing import List, Union, Optional, Tuple, Dict # Added Dict
from pathlib import Path # Added Path
from .base_stage import ProcessingStage from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext, MergeTaskDefinition from ..asset_context import AssetProcessingContext, MergeTaskDefinition
from rule_structure import FileRule # Assuming FileRule is imported correctly from rule_structure import FileRule, ProcessingItem # Added ProcessingItem
from processing.utils import image_processing_utils as ipu # Added ipu
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class PrepareProcessingItemsStage(ProcessingStage): class PrepareProcessingItemsStage(ProcessingStage):
""" """
Identifies and prepares a unified list of items (FileRule, MergeTaskDefinition) Identifies and prepares a unified list of ProcessingItem and MergeTaskDefinition objects
to be processed in subsequent stages. Performs initial validation. to be processed in subsequent stages. Performs initial validation and explodes
FileRules into specific ProcessingItems for each required output variant.
""" """
def _get_target_resolutions(self, source_w: int, source_h: int, config_resolutions: dict, file_rule: FileRule) -> Dict[str, int]:
"""
Determines the target output resolutions for a given source image.
Placeholder logic: Uses all config resolutions smaller than or equal to source, plus PREVIEW if smaller.
Needs to be refined to consider FileRule.resolution_override and actual project requirements.
"""
# For now, very basic logic:
# If FileRule has a resolution_override (e.g., (1024,1024)), that might be the *only* target.
# This needs to be clarified. Assuming override means *only* that size.
if file_rule.resolution_override and isinstance(file_rule.resolution_override, tuple) and len(file_rule.resolution_override) == 2:
# How to get a "key" for an arbitrary override? For now, skip if overridden.
# This part of the design (how overrides interact with standard resolutions) is unclear.
# Let's assume for now that if resolution_override is set, we don't generate standard named resolutions.
# This is likely incorrect for a full implementation.
log.warning(f"FileRule '{file_rule.file_path}' has resolution_override. Standard resolution key generation skipped (needs design refinement).")
return {}
target_res = {}
max_source_dim = max(source_w, source_h)
for key, res_val in config_resolutions.items():
if key == "PREVIEW": # Always consider PREVIEW if its value is smaller
if res_val < max_source_dim : # Or just always include PREVIEW? For now, if smaller.
target_res[key] = res_val
elif res_val <= max_source_dim:
target_res[key] = res_val
# Ensure PREVIEW is included if it's defined and smaller than the smallest other target, or if no other targets.
# This logic is still a bit naive.
if "PREVIEW" in config_resolutions and config_resolutions["PREVIEW"] < max_source_dim:
if not target_res or config_resolutions["PREVIEW"] < min(v for k,v in target_res.items() if k != "PREVIEW" and isinstance(v,int)):
target_res["PREVIEW"] = config_resolutions["PREVIEW"]
elif "PREVIEW" in config_resolutions and not target_res : # if only preview is applicable
if config_resolutions["PREVIEW"] <= max_source_dim:
target_res["PREVIEW"] = config_resolutions["PREVIEW"]
if not target_res and max_source_dim > 0 : # If no standard res is smaller, but image exists
log.debug(f"No standard resolutions from config are <= source dimension {max_source_dim}. Only LOWRES (if applicable) or PREVIEW (if smaller) might be generated.")
log.debug(f"Determined target resolutions for source {source_w}x{source_h}: {target_res}")
return target_res
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext: def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
""" """
Populates context.processing_items with FileRule and MergeTaskDefinition objects. Populates context.processing_items with ProcessingItem and MergeTaskDefinition objects.
""" """
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset" asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
log.info(f"Asset '{asset_name_for_log}': Preparing processing items...") log.info(f"Asset '{asset_name_for_log}': Preparing processing items...")
@@ -25,72 +73,135 @@ class PrepareProcessingItemsStage(ProcessingStage):
context.processing_items = [] context.processing_items = []
return context return context
items_to_process: List[Union[FileRule, MergeTaskDefinition]] = [] # Output list will now be List[Union[ProcessingItem, MergeTaskDefinition]]
items_to_process: List[Union[ProcessingItem, MergeTaskDefinition]] = []
preparation_failed = False preparation_failed = False
config = context.config_obj
# --- Add regular files --- # --- Process FileRules into ProcessingItems ---
if context.files_to_process: if context.files_to_process:
# Validate source path early for regular files
source_path_valid = True source_path_valid = True
if not context.source_rule or not context.source_rule.input_path: if not context.source_rule or not context.source_rule.input_path:
log.error(f"Asset '{asset_name_for_log}': SourceRule or SourceRule.input_path is not set. Cannot process regular files.") log.error(f"Asset '{asset_name_for_log}': SourceRule or SourceRule.input_path is not set.")
source_path_valid = False source_path_valid = False
preparation_failed = True # Mark as failed if source path is missing preparation_failed = True
context.status_flags['prepare_items_failed_reason'] = "SourceRule.input_path missing" context.status_flags['prepare_items_failed_reason'] = "SourceRule.input_path missing"
elif not context.workspace_path or not context.workspace_path.is_dir(): elif not context.workspace_path or not context.workspace_path.is_dir():
log.error(f"Asset '{asset_name_for_log}': Workspace path '{context.workspace_path}' is not a valid directory. Cannot process regular files.") log.error(f"Asset '{asset_name_for_log}': Workspace path '{context.workspace_path}' is invalid.")
source_path_valid = False source_path_valid = False
preparation_failed = True # Mark as failed if workspace path is bad preparation_failed = True
context.status_flags['prepare_items_failed_reason'] = "Workspace path invalid" context.status_flags['prepare_items_failed_reason'] = "Workspace path invalid"
if source_path_valid: if source_path_valid:
for file_rule in context.files_to_process: for file_rule in context.files_to_process:
# Basic validation for FileRule itself log_prefix_fr = f"Asset '{asset_name_for_log}', FileRule '{file_rule.file_path}'"
if not file_rule.file_path: if not file_rule.file_path:
log.warning(f"Asset '{asset_name_for_log}': Skipping FileRule with empty file_path.") log.warning(f"{log_prefix_fr}: Skipping FileRule with empty file_path.")
continue # Skip this specific rule, but don't fail the whole stage continue
items_to_process.append(file_rule)
log.debug(f"Asset '{asset_name_for_log}': Added {len(context.files_to_process)} potential FileRule items.") item_type = file_rule.item_type_override or file_rule.item_type
else: if not item_type or item_type == "EXTRA" or not item_type.startswith("MAP_"):
log.warning(f"Asset '{asset_name_for_log}': Skipping addition of all FileRule items due to invalid source/workspace path.") log.debug(f"{log_prefix_fr}: Item type is '{item_type}'. Not creating map ProcessingItems.")
# Optionally, create a different kind of ProcessingItem for EXTRAs if they need pipeline processing
continue
source_image_path = context.workspace_path / file_rule.file_path
if not source_image_path.is_file():
log.error(f"{log_prefix_fr}: Source image file not found at '{source_image_path}'. Skipping this FileRule.")
preparation_failed = True # Individual file error can contribute to overall stage failure
context.status_flags.setdefault('prepare_items_file_errors', []).append(str(source_image_path))
continue
# Load image data to get dimensions and for LOWRES variant
# This data will be passed to subsequent stages via ProcessingItem.
# Consider caching this load if RegularMapProcessorStage also loads.
# For now, load here as dimensions are needed for LOWRES decision.
log.debug(f"{log_prefix_fr}: Loading image from '{source_image_path}' to determine dimensions and prepare items.")
source_image_data = ipu.load_image(str(source_image_path))
if source_image_data is None:
log.error(f"{log_prefix_fr}: Failed to load image from '{source_image_path}'. Skipping this FileRule.")
preparation_failed = True
context.status_flags.setdefault('prepare_items_file_errors', []).append(f"Failed to load {source_image_path}")
continue
orig_h, orig_w = source_image_data.shape[:2]
original_dimensions_wh = (orig_w, orig_h)
source_bit_depth = ipu.get_image_bit_depth(str(source_image_path)) # Get bit depth from file
source_channels = ipu.get_image_channels(source_image_data)
# --- Add merged tasks --- # Determine standard resolutions to generate
# --- Add merged tasks from global configuration --- # This logic needs to be robust and consider file_rule.resolution_override, etc.
# merged_image_tasks are expected to be loaded into context.config_obj # Using a placeholder _get_target_resolutions for now.
# by the Configuration class from app_settings.json. target_resolutions = self._get_target_resolutions(orig_w, orig_h, config.image_resolutions, file_rule)
merged_tasks_list = getattr(context.config_obj, 'map_merge_rules', None)
for res_key, _res_val in target_resolutions.items():
pi = ProcessingItem(
source_file_info_ref=str(source_image_path), # Using full path as ref
map_type_identifier=item_type,
resolution_key=res_key,
image_data=source_image_data.copy(), # Give each PI its own copy
original_dimensions=original_dimensions_wh,
current_dimensions=original_dimensions_wh,
bit_depth=source_bit_depth,
channels=source_channels,
status="Pending"
)
items_to_process.append(pi)
log.debug(f"{log_prefix_fr}: Created standard ProcessingItem: {pi.map_type_identifier}_{pi.resolution_key}")
# Create LOWRES variant if applicable
if config.enable_low_resolution_fallback and max(orig_w, orig_h) < config.low_resolution_threshold:
# Check if a LOWRES item for this source_file_info_ref already exists (e.g. if target_resolutions was empty)
# This check is important if _get_target_resolutions might return empty for small images.
# A more robust way is to ensure LOWRES is distinct from standard resolutions.
# Avoid duplicate LOWRES if _get_target_resolutions somehow already made one (unlikely with current placeholder)
is_lowres_already_added = any(p.resolution_key == "LOWRES" and p.source_file_info_ref == str(source_image_path) for p in items_to_process if isinstance(p, ProcessingItem))
if not is_lowres_already_added:
pi_lowres = ProcessingItem(
source_file_info_ref=str(source_image_path),
map_type_identifier=item_type,
resolution_key="LOWRES",
image_data=source_image_data.copy(), # Fresh copy for LOWRES
original_dimensions=original_dimensions_wh,
current_dimensions=original_dimensions_wh,
bit_depth=source_bit_depth,
channels=source_channels,
status="Pending"
)
items_to_process.append(pi_lowres)
log.info(f"{log_prefix_fr}: Created LOWRES ProcessingItem because {orig_w}x{orig_h} < {config.low_resolution_threshold}px threshold.")
else:
log.debug(f"{log_prefix_fr}: LOWRES item for this source already added by target resolution logic. Skipping duplicate LOWRES creation.")
elif config.enable_low_resolution_fallback:
log.debug(f"{log_prefix_fr}: Image {orig_w}x{orig_h} not below LOWRES threshold {config.low_resolution_threshold}px.")
else: # Source path not valid
log.warning(f"Asset '{asset_name_for_log}': Skipping creation of ProcessingItems from FileRules due to invalid source/workspace path.")
# --- Add MergeTaskDefinitions --- (This part remains largely the same)
merged_tasks_list = getattr(config, 'map_merge_rules', None)
if merged_tasks_list and isinstance(merged_tasks_list, list): if merged_tasks_list and isinstance(merged_tasks_list, list):
log.debug(f"Asset '{asset_name_for_log}': Found {len(merged_tasks_list)} merge tasks in global config.") log.debug(f"Asset '{asset_name_for_log}': Found {len(merged_tasks_list)} merge tasks in global config.")
for task_idx, task_data in enumerate(merged_tasks_list): for task_idx, task_data in enumerate(merged_tasks_list):
if isinstance(task_data, dict): if isinstance(task_data, dict):
task_key = f"merged_task_{task_idx}" task_key = f"merged_task_{task_idx}"
# Basic validation for merge task data: requires output_map_type and an inputs dictionary
if not task_data.get('output_map_type') or not isinstance(task_data.get('inputs'), dict): if not task_data.get('output_map_type') or not isinstance(task_data.get('inputs'), dict):
log.warning(f"Asset '{asset_name_for_log}', Task Index {task_idx}: Skipping merge task due to missing 'output_map_type' or valid 'inputs' dictionary. Task data: {task_data}") log.warning(f"Asset '{asset_name_for_log}', Task Index {task_idx}: Skipping merge task due to missing 'output_map_type' or valid 'inputs'. Task data: {task_data}")
continue # Skip this specific task continue
log.debug(f"Asset '{asset_name_for_log}', Preparing Merge Task Index {task_idx}: Raw task_data: {task_data}")
merge_def = MergeTaskDefinition(task_data=task_data, task_key=task_key) merge_def = MergeTaskDefinition(task_data=task_data, task_key=task_key)
log.debug(f"Asset '{asset_name_for_log}': Created MergeTaskDefinition object: {merge_def}")
log.info(f"Asset '{asset_name_for_log}': Successfully CREATED MergeTaskDefinition: Key='{merge_def.task_key}', OutputType='{merge_def.task_data.get('output_map_type', 'N/A')}'")
items_to_process.append(merge_def) items_to_process.append(merge_def)
log.info(f"Asset '{asset_name_for_log}': Added MergeTaskDefinition: Key='{merge_def.task_key}', OutputType='{merge_def.task_data.get('output_map_type', 'N/A')}'")
else: else:
log.warning(f"Asset '{asset_name_for_log}': Item at index {task_idx} in config_obj.merged_image_tasks is not a dictionary. Skipping. Item: {task_data}") log.warning(f"Asset '{asset_name_for_log}': Item at index {task_idx} in config.map_merge_rules is not a dict. Skipping. Item: {task_data}")
# The log for "Added X potential MergeTaskDefinition items" will be covered by the final log. # ... (rest of merge task handling) ...
elif merged_tasks_list is None:
log.debug(f"Asset '{asset_name_for_log}': 'merged_image_tasks' not found in config_obj. No global merge tasks to add.")
elif not isinstance(merged_tasks_list, list):
log.warning(f"Asset '{asset_name_for_log}': 'merged_image_tasks' in config_obj is not a list. Skipping global merge tasks. Type: {type(merged_tasks_list)}")
else: # Empty list
log.debug(f"Asset '{asset_name_for_log}': 'merged_image_tasks' in config_obj is empty. No global merge tasks to add.")
if not items_to_process and not preparation_failed: # Check preparation_failed too
log.info(f"Asset '{asset_name_for_log}': No valid items (ProcessingItem or MergeTaskDefinition) found to process.")
if not items_to_process:
log.info(f"Asset '{asset_name_for_log}': No valid items found to process after preparation.")
log.debug(f"Asset '{asset_name_for_log}': Final items_to_process before assigning to context: {items_to_process}")
context.processing_items = items_to_process context.processing_items = items_to_process
context.intermediate_results = {} # Initialize intermediate results storage context.intermediate_results = {} # Initialize intermediate results storage

View File

@@ -37,7 +37,7 @@ class RegularMapProcessorStage(ProcessingStage):
""" """
final_internal_map_type = initial_internal_map_type # Default final_internal_map_type = initial_internal_map_type # Default
base_map_type_match = re.match(r"(MAP_[A-Z]{3})", initial_internal_map_type) base_map_type_match = re.match(r"(MAP_[A-Z]+)", initial_internal_map_type)
if not base_map_type_match or not asset_rule or not asset_rule.files: if not base_map_type_match or not asset_rule or not asset_rule.files:
return final_internal_map_type # Cannot determine suffix without base type or asset rule files return final_internal_map_type # Cannot determine suffix without base type or asset rule files
@@ -47,7 +47,7 @@ class RegularMapProcessorStage(ProcessingStage):
peers_of_same_base_type = [] peers_of_same_base_type = []
for fr_asset in asset_rule.files: for fr_asset in asset_rule.files:
fr_asset_item_type = fr_asset.item_type_override or fr_asset.item_type or "UnknownMapType" fr_asset_item_type = fr_asset.item_type_override or fr_asset.item_type or "UnknownMapType"
fr_asset_base_match = re.match(r"(MAP_[A-Z]{3})", fr_asset_item_type) fr_asset_base_match = re.match(r"(MAP_[A-Z]+)", fr_asset_item_type)
if fr_asset_base_match and fr_asset_base_match.group(1) == true_base_map_type: if fr_asset_base_match and fr_asset_base_match.group(1) == true_base_map_type:
peers_of_same_base_type.append(fr_asset) peers_of_same_base_type.append(fr_asset)
@@ -197,10 +197,17 @@ class RegularMapProcessorStage(ProcessingStage):
result.final_internal_map_type = final_map_type # Update if Gloss->Rough changed it result.final_internal_map_type = final_map_type # Update if Gloss->Rough changed it
result.transformations_applied = transform_notes result.transformations_applied = transform_notes
# --- Determine Resolution Key for LOWRES ---
if config.enable_low_resolution_fallback and result.original_dimensions:
w, h = result.original_dimensions
if max(w, h) < config.low_resolution_threshold:
result.resolution_key = "LOWRES"
log.info(f"{log_prefix}: Image dimensions ({w}x{h}) are below threshold ({config.low_resolution_threshold}px). Flagging as LOWRES.")
# --- Success --- # --- Success ---
result.status = "Processed" result.status = "Processed"
result.error_message = None result.error_message = None
log.info(f"{log_prefix}: Successfully processed regular map. Final type: '{result.final_internal_map_type}'.") log.info(f"{log_prefix}: Successfully processed regular map. Final type: '{result.final_internal_map_type}', ResolutionKey: {result.resolution_key}.")
except Exception as e: except Exception as e:
log.exception(f"{log_prefix}: Unhandled exception during processing: {e}") log.exception(f"{log_prefix}: Unhandled exception during processing: {e}")

View File

@@ -23,8 +23,17 @@ class SaveVariantsStage(ProcessingStage):
Calls isu.save_image_variants with data from input_data. Calls isu.save_image_variants with data from input_data.
""" """
internal_map_type = input_data.internal_map_type internal_map_type = input_data.internal_map_type
log_prefix = f"Save Variants Stage (Type: {internal_map_type})" # The input_data for SaveVariantsStage doesn't directly contain the ProcessingItem.
# It receives data *derived* from a ProcessingItem by previous stages.
# For debugging, we'd need to pass more context or rely on what's in output_filename_pattern_tokens.
resolution_key_from_tokens = input_data.output_filename_pattern_tokens.get('resolution', 'UnknownResKey')
log_prefix = f"Save Variants Stage (Type: {internal_map_type}, ResKey: {resolution_key_from_tokens})"
log.info(f"{log_prefix}: Starting.") log.info(f"{log_prefix}: Starting.")
log.debug(f"{log_prefix}: Input image_data shape: {input_data.image_data.shape if input_data.image_data is not None else 'None'}")
log.debug(f"{log_prefix}: Input source_bit_depth_info: {input_data.source_bit_depth_info}")
log.debug(f"{log_prefix}: Configured image_resolutions for saving: {input_data.image_resolutions}")
log.debug(f"{log_prefix}: Output filename pattern tokens: {input_data.output_filename_pattern_tokens}")
# Initialize output object with default failure state # Initialize output object with default failure state
result = SaveVariantsOutput( result = SaveVariantsOutput(
@@ -64,11 +73,11 @@ class SaveVariantsStage(ProcessingStage):
"resolution_threshold_for_jpg": input_data.resolution_threshold_for_jpg, # Added "resolution_threshold_for_jpg": input_data.resolution_threshold_for_jpg, # Added
} }
log.debug(f"{log_prefix}: Calling save_image_variants utility.") log.debug(f"{log_prefix}: Calling save_image_variants utility with args: {save_args}")
saved_files_details: List[Dict] = isu.save_image_variants(**save_args) saved_files_details: List[Dict] = isu.save_image_variants(**save_args)
if saved_files_details: if saved_files_details:
log.info(f"{log_prefix}: Save utility completed successfully. Saved {len(saved_files_details)} variants.") log.info(f"{log_prefix}: Save utility completed successfully. Saved {len(saved_files_details)} variants: {[details.get('filepath') for details in saved_files_details]}")
result.saved_files_details = saved_files_details result.saved_files_details = saved_files_details
result.status = "Processed" result.status = "Processed"
result.error_message = None result.error_message = None

View File

@@ -194,6 +194,16 @@ def get_image_bit_depth(image_path_str: str) -> Optional[int]:
print(f"Error getting bit depth for {image_path_str}: {e}") print(f"Error getting bit depth for {image_path_str}: {e}")
return None return None
def get_image_channels(image_data: np.ndarray) -> Optional[int]:
"""Determines the number of channels in an image."""
if image_data is None:
return None
if len(image_data.shape) == 2: # Grayscale
return 1
elif len(image_data.shape) == 3: # Color
return image_data.shape[2]
return None # Unknown shape
def calculate_image_stats(image_data: np.ndarray) -> Optional[Dict]: def calculate_image_stats(image_data: np.ndarray) -> Optional[Dict]:
""" """
Calculates min, max, mean for a given numpy image array. Calculates min, max, mean for a given numpy image array.

44
projectBrief.md Normal file
View File

@@ -0,0 +1,44 @@
# Project Brief: Asset Processor Tool
## 1. Main Goal & Purpose
The primary goal of the Asset Processor Tool is to provide **CG artists and 3D content teams with a friendly, fast, and flexible interface to process and organize 3D asset source files into a standardized library format.** It automates repetitive and complex tasks involved in preparing assets from various suppliers for use in production pipelines.
## 2. Key Features & Components
* **Automated Asset Processing:** Ingests 3D asset source files (texture sets, models, etc.) from `.zip`, `.rar`, `.7z` archives, or folders.
* **Preset-Driven Workflow:** Utilizes configurable JSON presets to interpret different asset sources (e.g., from various online vendors or internal standards), defining rules for file classification and processing.
* **Comprehensive File Operations:**
* **Classification:** Automatically identifies map types (Color, Normal, Roughness, etc.), models, and other file categories based on preset rules.
* **Image Processing:** Performs tasks like image resizing (to standard resolutions like 1K, 2K, 4K, avoiding upscaling), glossiness-to-roughness conversion, normal map green channel inversion (OpenGL/DirectX handling), alpha channel extraction, bit-depth adjustments, and low-resolution fallback generation for small source images.
* **Channel Merging:** Combines channels from different source maps into packed textures (e.g., Normal + Roughness + Metallic into a single NRMRGH map).
* **Metadata Generation:** Creates a detailed `metadata.json` file for each processed asset, containing information about maps, categories, processing settings, and more, for downstream tool integration.
* **Flexible Output Organization:** Generates a clean, structured output directory based on user-configurable naming patterns and tokens.
* **Multiple User Interfaces:**
* **Graphical User Interface (GUI):** The primary interface, designed to be user-friendly, offering drag-and-drop functionality, an integrated preset editor, a live preview table for rule validation and overrides, and clear processing controls.
* **Directory Monitor:** An automated script that watches a specified folder for new asset archives and processes them based on preset names embedded in the archive filename.
* **Command-Line Interface (CLI):** Intended for batch processing and scripting (currently with limited core functionality).
* **Optional Blender Integration:** Can automatically run Blender scripts post-processing to create PBR node groups and materials in specified `.blend` files, linking to the newly processed textures.
* **Hierarchical Rule System:** Allows for dynamic, granular overrides of preset configurations at the source, asset, or individual file level via the GUI.
* **Experimental LLM Prediction:** Includes an option to use a Large Language Model for file interpretation and rule prediction.
## 3. Target Audience
* **CG Artists:** Individual artists looking for an efficient way to manage and prepare their personal or downloaded asset libraries.
* **3D Content Creation Teams:** Studios or groups needing a standardized pipeline for processing and organizing assets from multiple sources.
* **Technical Artists/Pipeline Developers:** Who may extend or integrate the tool into broader production workflows.
## 4. Overall Architectural Style & Key Technologies
* **Core Language:** Python
* **GUI Framework:** PySide6
* **Configuration:** Primarily JSON-based (application settings, user overrides, type definitions, supplier settings, presets, LLM settings).
* **Processing Architecture:** A modular, staged processing pipeline orchestrated by a central engine. Each stage performs a discrete task on an `AssetProcessingContext` object.
* **Key Libraries:** OpenCV (image processing), NumPy (numerical operations), py7zr/rarfile (archive handling), watchdog (directory monitoring).
* **Design Principles:** Modularity, configurability, and user-friendliness (especially for the GUI).
## 5. Foundational Information
* The tool aims to significantly reduce manual effort and ensure consistency in asset preparation.
* It is designed to be adaptable to various asset sources and pipeline requirements through its extensive configuration options and preset system.
* The output `metadata.json` is key for enabling further automation and integration with other tools or digital content creation (DCC) applications.

View File

@@ -1,6 +1,7 @@
import dataclasses import dataclasses
import json import json
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional
import numpy as np # Added for ProcessingItem
@dataclasses.dataclass @dataclasses.dataclass
class FileRule: class FileRule:
file_path: str = None file_path: str = None
@@ -10,8 +11,12 @@ class FileRule:
resolution_override: Tuple[int, int] = None resolution_override: Tuple[int, int] = None
channel_merge_instructions: Dict[str, Any] = dataclasses.field(default_factory=dict) channel_merge_instructions: Dict[str, Any] = dataclasses.field(default_factory=dict)
output_format_override: str = None output_format_override: str = None
processing_items: List['ProcessingItem'] = dataclasses.field(default_factory=list) # Added field
def to_json(self) -> str: def to_json(self) -> str:
# Need to handle ProcessingItem serialization if it contains non-serializable types like np.ndarray
# For now, assume asdict handles it or it's handled before calling to_json for persistence.
# A custom asdict_factory might be needed for robust serialization.
return json.dumps(dataclasses.asdict(self), indent=4) return json.dumps(dataclasses.asdict(self), indent=4)
@classmethod @classmethod
@@ -54,4 +59,43 @@ class SourceRule:
data = json.loads(json_string) data = json.loads(json_string)
# Manually deserialize nested AssetRule objects # Manually deserialize nested AssetRule objects
data['assets'] = [AssetRule.from_json(json.dumps(asset_data)) for asset_data in data.get('assets', [])] data['assets'] = [AssetRule.from_json(json.dumps(asset_data)) for asset_data in data.get('assets', [])]
return cls(**data) # Need to handle ProcessingItem deserialization if it was serialized
# For now, from_json for FileRule doesn't explicitly handle processing_items from JSON.
return cls(**data)
@dataclasses.dataclass
class ProcessingItem:
"""
Represents a specific version of an image map to be processed and saved.
This could be a standard resolution (1K, 2K), a preview, or a special
variant like 'LOWRES'.
"""
source_file_info_ref: str # Reference to the original SourceFileInfo or unique ID of the source image
map_type_identifier: str # The internal map type (e.g., "MAP_COL", "MAP_ROUGH")
resolution_key: str # The resolution identifier (e.g., "1K", "PREVIEW", "LOWRES")
image_data: np.ndarray # The actual image data for this item
original_dimensions: Tuple[int, int] # (width, height) of the source image for this item
current_dimensions: Tuple[int, int] # (width, height) of the image_data in this item
target_filename: str = "" # Will be populated by SaveVariantsStage
is_extra: bool = False # If this item should be treated as an 'extra' file
bit_depth: Optional[int] = None
channels: Optional[int] = None
file_extension: Optional[str] = None # Determined during saving based on format
processing_applied_log: List[str] = dataclasses.field(default_factory=list)
status: str = "Pending" # e.g., Pending, Processed, Failed
error_message: Optional[str] = None
# __getstate__ and __setstate__ might be needed if we pickle these objects
# and np.ndarray causes issues. For JSON, image_data would typically not be serialized.
def __getstate__(self):
state = self.__dict__.copy()
# Don't pickle image_data if it's large or not needed for state
if 'image_data' in state: # Or a more sophisticated check
del state['image_data'] # Example: remove it
return state
def __setstate__(self, state):
self.__dict__.update(state)
# Potentially re-initialize or handle missing 'image_data'
if 'image_data' not in self.__dict__:
self.image_data = None # Or load it if a path was stored instead