Processing-Refactor #63

Merged
Rusfort merged 16 commits from Processing-Refactor into Dev 2025-05-13 09:25:09 +02:00
Showing only changes of commit b441174076 - Show all commits

View File

@ -1,69 +1,95 @@
# Developer Guide: Processing Pipeline
This document details the step-by-step technical process executed by the asset processing pipeline, which is initiated by the `ProcessingEngine` class (`processing_engine.py`) and orchestrated by the `PipelineOrchestrator` (`processing/pipeline/orchestrator.py`).
This document details the step-by-step technical process executed by the asset processing pipeline, which is initiated by the [`ProcessingEngine`](processing_engine.py:73) class (`processing_engine.py`) and orchestrated by the [`PipelineOrchestrator`](processing/pipeline/orchestrator.py:36) (`processing/pipeline/orchestrator.py`).
The `ProcessingEngine.process()` method serves as the main entry point. It initializes a `PipelineOrchestrator` instance, providing it with the application's `Configuration` object and a predefined list of processing stages. The `PipelineOrchestrator.process_source_rule()` method then manages the execution of these stages for each asset defined in the input `SourceRule`.
The [`ProcessingEngine.process()`](processing_engine.py:131) method serves as the main entry point. It initializes a [`PipelineOrchestrator`](processing/pipeline/orchestrator.py:36) instance, providing it with the application's [`Configuration`](configuration.py:68) object and predefined lists of pre-item and post-item processing stages. The [`PipelineOrchestrator.process_source_rule()`](processing/pipeline/orchestrator.py:95) method then manages the execution of these stages for each asset defined in the input [`SourceRule`](rule_structure.py:40).
A crucial component in this architecture is the `AssetProcessingContext` (`processing/pipeline/asset_context.py`). An instance of this dataclass is created for each `AssetRule` being processed. It acts as a stateful container, carrying all relevant data (source files, rules, configuration, intermediate results, metadata) and is passed sequentially through each stage. Each stage can read from and write to the context, allowing data to flow and be modified throughout the pipeline.
A crucial component in this architecture is the [`AssetProcessingContext`](processing/pipeline/asset_context.py:86) (`processing/pipeline/asset_context.py`). An instance of this dataclass is created for each [`AssetRule`](rule_structure.py:22) being processed. It acts as a stateful container, carrying all relevant data (source files, rules, configuration, intermediate results, metadata) and is passed sequentially through each stage. Each stage can read from and write to the context, allowing data to flow and be modified throughout the pipeline.
The pipeline stages are executed in the following order:
The pipeline execution for each asset follows this general flow:
1. **`SupplierDeterminationStage` (`processing/pipeline/stages/supplier_determination.py`)**:
* **Responsibility**: Determines the effective supplier for the asset based on the `SourceRule`'s `supplier_identifier`, `supplier_override`, and supplier definitions in the `Configuration`.
* **Context Interaction**: Updates `AssetProcessingContext.effective_supplier` and potentially `AssetProcessingContext.asset_metadata` with supplier information.
1. **Pre-Item Stages:** A sequence of stages executed once per asset before the core item processing loop. These stages typically perform initial setup, filtering, and asset-level transformations.
2. **Core Item Processing Loop:** The [`PipelineOrchestrator`](processing/pipeline/orchestrator.py:36) iterates through a list of "processing items" (individual files or merge tasks) prepared by a dedicated stage. For each item, a sequence of core processing stages is executed.
3. **Post-Item Stages:** A sequence of stages executed once per asset after the core item processing loop is complete. These stages handle final tasks like organizing output files and saving metadata.
2. **`AssetSkipLogicStage` (`processing/pipeline/stages/asset_skip_logic.py`)**:
* **Responsibility**: Checks if the asset should be skipped, typically if the output already exists and overwriting is not forced.
* **Context Interaction**: Sets `AssetProcessingContext.status_flags['skip_asset']` to `True` if the asset should be skipped, halting further processing for this asset by the orchestrator.
## Pipeline Stages
3. **`MetadataInitializationStage` (`processing/pipeline/stages/metadata_initialization.py`)**:
* **Responsibility**: Initializes the `AssetProcessingContext.asset_metadata` dictionary with base information derived from the `AssetRule`, `SourceRule`, and `Configuration`. This includes asset name, type, and any common metadata.
* **Context Interaction**: Populates `AssetProcessingContext.asset_metadata`.
The stages are executed in the following order for each asset:
4. **`FileRuleFilterStage` (`processing/pipeline/stages/file_rule_filter.py`)**:
* **Responsibility**: Filters the `FileRule` objects from the `AssetRule` to determine which files should actually be processed. It respects `FILE_IGNORE` rules.
* **Context Interaction**: Populates `AssetProcessingContext.files_to_process` with the list of `FileRule` objects that passed the filter.
### Pre-Item Stages
5. **`GlossToRoughConversionStage` (`processing/pipeline/stages/gloss_to_rough_conversion.py`)**:
* **Responsibility**: Identifies gloss maps (based on `FileRule` properties and filename conventions) that are intended to be used as roughness maps. If found, it loads the image, inverts its colors, and saves a temporary inverted version.
* **Context Interaction**: Modifies `FileRule` objects in `AssetProcessingContext.files_to_process` (e.g., updates `file_path` to point to the temporary inverted map, sets flags indicating inversion). Updates `AssetProcessingContext.processed_maps_details` with information about the conversion.
These stages are executed sequentially once for each asset before the core item processing loop begins.
6. **`AlphaExtractionToMaskStage` (`processing/pipeline/stages/alpha_extraction_to_mask.py`)**:
* **Responsibility**: If a `FileRule` specifies alpha channel extraction (e.g., from a diffuse map to create an opacity mask), this stage loads the source image, extracts its alpha channel, and saves it as a new temporary grayscale map.
* **Context Interaction**: May add new `FileRule`-like entries or details to `AssetProcessingContext.processed_maps_details` representing the extracted mask.
1. **[`SupplierDeterminationStage`](processing/pipeline/stages/supplier_determination.py:6)** (`processing/pipeline/stages/supplier_determination.py`):
* **Responsibility**: Determines the effective supplier for the asset based on the [`SourceRule`](rule_structure.py:40)'s `supplier_override`, `supplier_identifier`, and validation against configured suppliers.
* **Context Interaction**: Sets `context.effective_supplier` and may set a `supplier_error` flag in `context.status_flags`.
7. **`NormalMapGreenChannelStage` (`processing/pipeline/stages/normal_map_green_channel.py`)**:
* **Responsibility**: Checks `FileRule`s for normal maps and, based on configuration (e.g., `invert_normal_map_green_channel` for a specific supplier), potentially inverts the green channel of the normal map image.
* **Context Interaction**: Modifies the image data for normal maps if inversion is needed, saving a new temporary version. Updates `AssetProcessingContext.processed_maps_details`.
2. **[`AssetSkipLogicStage`](processing/pipeline/stages/asset_skip_logic.py:5)** (`processing/pipeline/stages/asset_skip_logic.py`):
* **Responsibility**: Checks if the entire asset should be skipped based on conditions like a missing/invalid supplier, a "SKIP" status in asset metadata, or if the asset is already processed and overwrite is disabled.
* **Context Interaction**: Sets the `skip_asset` flag and `skip_reason` in `context.status_flags` if the asset should be skipped.
8. **`IndividualMapProcessingStage` (`processing/pipeline/stages/individual_map_processing.py`)**:
* **Responsibility**: Processes individual texture map files. This includes:
* Loading the source image.
* Applying Power-of-Two (POT) scaling.
* Generating multiple resolution variants based on configuration.
* Handling color space conversions (e.g., BGR to RGB).
* Calculating image statistics (min, max, mean, median).
* Determining and storing aspect ratio change information.
* Saving processed temporary map files.
* Applying name variant suffixing and using standard type aliases for filenames.
* **Context Interaction**: Heavily populates `AssetProcessingContext.processed_maps_details` with paths to temporary processed files, dimensions, and other metadata for each map and its variants. Updates `AssetProcessingContext.asset_metadata` with image stats and aspect ratio info.
3. **[`MetadataInitializationStage`](processing/pipeline/stages/metadata_initialization.py:81)** (`processing/pipeline/stages/metadata_initialization.py`):
* **Responsibility**: Initializes the `context.asset_metadata` dictionary with base information derived from the [`AssetRule`](rule_structure.py:22), [`SourceRule`](rule_structure.py:40), and [`Configuration`](configuration.py:68). This includes asset name, IDs, source/output paths, timestamps, and initial status.
* **Context Interaction**: Populates `context.asset_metadata` and initializes empty dictionaries for `processed_maps_details` and `merged_maps_details`.
9. **`MapMergingStage` (`processing/pipeline/stages/map_merging.py`)**:
* **Responsibility**: Performs channel packing and other merge operations based on `map_merge_rules` defined in the `Configuration`.
* **Context Interaction**: Reads source map details and temporary file paths from `AssetProcessingContext.processed_maps_details`. Saves new temporary merged maps and records their details in `AssetProcessingContext.merged_maps_details`.
4. **[`FileRuleFilterStage`](processing/pipeline/stages/file_rule_filter.py:10)** (`processing/pipeline/stages/file_rule_filter.py`):
* **Responsibility**: Filters the [`FileRule`](rule_structure.py:5) objects associated with the asset to determine which individual files should be considered for processing. It identifies and excludes files matching "FILE_IGNORE" rules.
* **Context Interaction**: Populates `context.files_to_process` with the list of [`FileRule`](rule_structure.py:5) objects that are not ignored.
10. **`MetadataFinalizationAndSaveStage` (`processing/pipeline/stages/metadata_finalization_save.py`)**:
* **Responsibility**: Collects all accumulated metadata from `AssetProcessingContext.asset_metadata`, `AssetProcessingContext.processed_maps_details`, and `AssetProcessingContext.merged_maps_details`. It structures this information and saves it as the `metadata.json` file in a temporary location within the engine's temporary directory.
* **Context Interaction**: Reads from various context fields and writes the `metadata.json` file. Stores the path to this temporary metadata file in the context (e.g., `AssetProcessingContext.asset_metadata['temp_metadata_path']`).
5. **[`GlossToRoughConversionStage`](processing/pipeline/stages/gloss_to_rough_conversion.py:15)** (`processing/pipeline/stages/gloss_to_rough_conversion.py`):
* **Responsibility**: Identifies processed maps that were originally glossiness maps. If found, it loads the temporary image data, inverts it, saves a new temporary roughness map, and updates the corresponding details in `context.processed_maps_details` and the relevant [`FileRule`](rule_structure.py:5) in `context.files_to_process`.
* **Context Interaction**: Reads from and updates `context.processed_maps_details` and `context.files_to_process`.
11. **`OutputOrganizationStage` (`processing/pipeline/stages/output_organization.py`)**:
* **Responsibility**: Determines final output paths for all processed maps, merged maps, the metadata file, and any other asset files (like models). It then copies these files from their temporary locations to the final structured output directory.
* **Context Interaction**: Reads temporary file paths from `AssetProcessingContext.processed_maps_details`, `AssetProcessingContext.merged_maps_details`, and the temporary metadata file path. Uses `Configuration` for output path patterns. Updates `AssetProcessingContext.asset_metadata` with final file paths and status.
6. **[`AlphaExtractionToMaskStage`](processing/pipeline/stages/alpha_extraction_to_mask.py:16)** (`processing/pipeline/stages/alpha_extraction_to_mask.py`):
* **Responsibility**: If no mask map is explicitly defined for the asset, this stage searches for a suitable source map (e.g., Albedo, Diffuse) with an alpha channel in `context.processed_maps_details`. If found, it extracts the alpha channel, saves it as a new temporary mask map, and adds a new [`FileRule`](rule_structure.py:5) and corresponding details to the context.
* **Context Interaction**: Reads from `context.processed_maps_details`, adds a new [`FileRule`](rule_structure.py:5) to `context.files_to_process`, and adds a new entry to `context.processed_maps_details`.
**External Steps (Not part of `PipelineOrchestrator`'s direct loop but integral to the overall process):**
7. **[`NormalMapGreenChannelStage`](processing/pipeline/stages/normal_map_green_channel.py:14)** (`processing/pipeline/stages/normal_map_green_channel.py`):
* **Responsibility**: Identifies processed normal maps in `context.processed_maps_details`. If the global `invert_normal_map_green_channel_globally` configuration is true, it loads the temporary image data, inverts the green channel, saves a new temporary modified normal map, and updates the corresponding details in `context.processed_maps_details`.
* **Context Interaction**: Reads from and updates `context.processed_maps_details`.
* **Workspace Preparation and Cleanup**: Handled by the code that invokes `ProcessingEngine.process()` (e.g., `main.ProcessingTask`, `monitor._process_archive_task`), typically using `utils.workspace_utils`. The engine itself creates a sub-temporary directory (`engine_temp_dir`) within the workspace provided to it by the orchestrator, which it cleans up.
* **Prediction and Rule Generation**: Also external, performed before `ProcessingEngine` is called. Generates the `SourceRule`.
* **Optional Blender Script Execution**: Triggered externally after successful processing.
### Core Item Processing Loop
This staged pipeline provides a modular and extensible architecture for asset processing, with clear separation of concerns for each step. The `AssetProcessingContext` ensures that data flows consistently between these stages.r
The [`PipelineOrchestrator`](processing/pipeline/orchestrator.py:36) iterates through the `context.processing_items` list (populated by the [`PrepareProcessingItemsStage`](processing/pipeline/stages/prepare_processing_items.py:10)). For each item (either a [`FileRule`](rule_structure.py:5) for a regular map or a [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16) for a merged map), the following stages are executed sequentially:
1. **[`PrepareProcessingItemsStage`](processing/pipeline/stages/prepare_processing_items.py:10)** (`processing/pipeline/stages/prepare_processing_items.py`):
* **Responsibility**: (Executed once before the loop) Creates the `context.processing_items` list by combining [`FileRule`](rule_structure.py:5)s from `context.files_to_process` and [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16)s derived from the global `merged_image_tasks` configuration. Initializes `context.intermediate_results`.
* **Context Interaction**: Populates `context.processing_items` and initializes `context.intermediate_results`.
2. **[`RegularMapProcessorStage`](processing/pipeline/stages/regular_map_processor.py:18)** (`processing/pipeline/stages/regular_map_processor.py`):
* **Responsibility**: (Executed per [`FileRule`](rule_structure.py:5) item) Loads the image data for a single file, determines its potentially suffixed internal map type, applies in-memory transformations (Gloss-to-Rough, Normal Green Invert), and returns the processed image data and details in a [`ProcessedRegularMapData`](processing/pipeline/asset_context.py:23) object.
* **Context Interaction**: Reads from the input [`FileRule`](rule_structure.py:5) and [`Configuration`](configuration.py:68). Returns a [`ProcessedRegularMapData`](processing/pipeline/asset_context.py:23) object which is stored in `context.intermediate_results`.
3. **[`MergedTaskProcessorStage`](processing/pipeline/stages/merged_task_processor.py:68)** (`processing/pipeline/stages/merged_task_processor.py`):
* **Responsibility**: (Executed per [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16) item) Loads and prepares multiple input images based on the merge task definition (including fallbacks and in-memory transformations), handles dimension mismatches, performs the channel merging operation, and returns the merged image data and details in a [`ProcessedMergedMapData`](processing/pipeline/asset_context.py:35) object.
* **Context Interaction**: Reads from the input [`MergeTaskDefinition`](processing/pipeline/asset_context.py:16), `context.workspace_path`, and [`Configuration`](configuration.py:68). Returns a [`ProcessedMergedMapData`](processing/pipeline/asset_context.py:35) object which is stored in `context.intermediate_results`.
4. **[`InitialScalingStage`](processing/pipeline/stages/initial_scaling.py:14)** (`processing/pipeline/stages/initial_scaling.py`):
* **Responsibility**: (Executed per item) Applies initial scaling (e.g., Power-of-Two downscaling) to the image data from the previous processing stage based on the `initial_scaling_mode` configuration.
* **Context Interaction**: Takes a [`InitialScalingInput`](processing/pipeline/asset_context.py:46) (containing image data and config) and returns an [`InitialScalingOutput`](processing/pipeline/asset_context.py:54) object, which updates the item's entry in `context.intermediate_results`.
5. **[`SaveVariantsStage`](processing/pipeline/stages/save_variants.py:15)** (`processing/pipeline/stages/save_variants.py`):
* **Responsibility**: (Executed per item) Takes the final processed image data (potentially scaled) and configuration, and calls a utility to save the image to temporary files in various resolutions and formats as defined by the configuration.
* **Context Interaction**: Takes a [`SaveVariantsInput`](processing/pipeline/asset_context.py:61) object. Returns a [`SaveVariantsOutput`](processing/pipeline/asset_context.py:79) object containing details about the saved temporary files. The orchestrator stores these details in `context.processed_maps_details` for the item.
### Post-Item Stages
These stages are executed sequentially once for each asset after the core item processing loop has finished for all items.
1. **[`OutputOrganizationStage`](processing/pipeline/stages/output_organization.py:14)** (`processing/pipeline/stages/output_organization.py`):
* **Responsibility**: Determines the final output paths for all processed maps (including variants) and extra files based on configured patterns. It copies the temporary files generated by the core stages to these final destinations, creating directories as needed and respecting overwrite settings.
* **Context Interaction**: Reads from `context.processed_maps_details`, `context.files_to_process` (for 'EXTRA' files), `context.output_base_path`, and [`Configuration`](configuration.py:68). Updates entries in `context.processed_maps_details` with final paths and organization status. Populates `context.asset_metadata['final_output_files']`.
2. **[`MetadataFinalizationAndSaveStage`](processing/pipeline/stages/metadata_finalization_save.py:14)** (`processing/pipeline/stages/metadata_finalization_save.py`):
* **Responsibility**: Finalizes the `context.asset_metadata` (setting end time, final status based on flags). It restructures the processed map details for inclusion, determines the save path for the metadata file based on configuration and patterns, serializes the metadata to JSON, and saves the `metadata.json` file to the final output location.
* **Context Interaction**: Reads from `context.asset_metadata`, `context.processed_maps_details`, `context.merged_maps_details`, `context.output_base_path`, and [`Configuration`](configuration.py:68). Writes the `metadata.json` file and updates `context.asset_metadata` with its final path and status.
## External Steps
Certain steps are integral to the overall asset processing workflow but are handled outside the [`PipelineOrchestrator`](processing/pipeline/orchestrator.py:36)'s direct execution loop:
* **Workspace Preparation and Cleanup**: Handled by the code that invokes [`ProcessingEngine.process()`](processing_engine.py:131) (e.g., `main.ProcessingTask`, `monitor._process_archive_task`), typically involving extracting archives and setting up temporary directories. The engine itself manages a sub-temporary directory (`engine_temp_dir`) for intermediate processing files.
* **Prediction and Rule Generation**: Performed before the [`ProcessingEngine`](processing_engine.py:73) is called. This involves analyzing source files and generating the [`SourceRule`](rule_structure.py:40) object with its nested [`AssetRule`](rule_structure.py:22)s and [`FileRule`](rule_structure.py:5)s, often involving prediction logic (potentially using LLMs).
* **Optional Blender Script Execution**: Can be triggered externally after successful processing to perform tasks like material setup in Blender using the generated output files and metadata.
This staged pipeline provides a modular and extensible architecture for asset processing, with clear separation of concerns for each step. The [`AssetProcessingContext`](processing/pipeline/asset_context.py:86) ensures that data flows consistently between these stages.