14 KiB
Developer Guide: Processing Pipeline
This document details the step-by-step technical process executed by the asset processing pipeline, which is initiated by the ProcessingEngine class (processing_engine.py) and orchestrated by the PipelineOrchestrator (processing/pipeline/orchestrator.py).
The ProcessingEngine.process() method serves as the main entry point. It initializes a PipelineOrchestrator instance, providing it with the application's Configuration object and predefined lists of pre-item and post-item processing stages. The PipelineOrchestrator.process_source_rule() method then manages the execution of these stages for each asset defined in the input SourceRule.
A crucial component in this architecture is the AssetProcessingContext (processing/pipeline/asset_context.py). An instance of this dataclass is created for each AssetRule being processed. It acts as a stateful container, carrying all relevant data (source files, rules, configuration, intermediate results, metadata) and is passed sequentially through each stage. Each stage can read from and write to the context, allowing data to flow and be modified throughout the pipeline.
The pipeline execution for each asset follows this general flow:
- Pre-Item Stages: A sequence of stages executed once per asset before the core item processing loop. These stages typically perform initial setup, filtering, and asset-level transformations.
- Core Item Processing Loop: The
PipelineOrchestratoriterates through a list of "processing items" (individual files or merge tasks) prepared by a dedicated stage. For each item, a sequence of core processing stages is executed. - Post-Item Stages: A sequence of stages executed once per asset after the core item processing loop is complete. These stages handle final tasks like organizing output files and saving metadata.
Pipeline Stages
The stages are executed in the following order for each asset:
Pre-Item Stages
These stages are executed sequentially once for each asset before the core item processing loop begins.
-
SupplierDeterminationStage(processing/pipeline/stages/supplier_determination.py):- Responsibility: Determines the effective supplier for the asset based on the
SourceRule'ssupplier_override,supplier_identifier, and validation against configured suppliers. - Context Interaction: Sets
context.effective_supplierand may set asupplier_errorflag incontext.status_flags.
- Responsibility: Determines the effective supplier for the asset based on the
-
AssetSkipLogicStage(processing/pipeline/stages/asset_skip_logic.py):- Responsibility: Checks if the entire asset should be skipped based on conditions like a missing/invalid supplier, a "SKIP" status in asset metadata, or if the asset is already processed and overwrite is disabled.
- Context Interaction: Sets the
skip_assetflag andskip_reasonincontext.status_flagsif the asset should be skipped.
-
MetadataInitializationStage(processing/pipeline/stages/metadata_initialization.py):- Responsibility: Initializes the
context.asset_metadatadictionary with base information derived from theAssetRule,SourceRule, andConfiguration. This includes asset name, IDs, source/output paths, timestamps, and initial status. - Context Interaction: Populates
context.asset_metadataand initializes empty dictionaries forprocessed_maps_detailsandmerged_maps_details.
- Responsibility: Initializes the
-
FileRuleFilterStage(processing/pipeline/stages/file_rule_filter.py):- Responsibility: Filters the
FileRuleobjects associated with the asset to determine which individual files should be considered for processing. It identifies and excludes files matching "FILE_IGNORE" rules. - Context Interaction: Populates
context.files_to_processwith the list ofFileRuleobjects that are not ignored.
- Responsibility: Filters the
-
GlossToRoughConversionStage(processing/pipeline/stages/gloss_to_rough_conversion.py):- Responsibility: Identifies processed maps that were originally glossiness maps. If found, it loads the temporary image data, inverts it, saves a new temporary roughness map, and updates the corresponding details in
context.processed_maps_detailsand the relevantFileRuleincontext.files_to_process. - Context Interaction: Reads from and updates
context.processed_maps_detailsandcontext.files_to_process.
- Responsibility: Identifies processed maps that were originally glossiness maps. If found, it loads the temporary image data, inverts it, saves a new temporary roughness map, and updates the corresponding details in
-
AlphaExtractionToMaskStage(processing/pipeline/stages/alpha_extraction_to_mask.py):- Responsibility: If no mask map is explicitly defined for the asset, this stage searches for a suitable source map (e.g., Albedo, Diffuse) with an alpha channel in
context.processed_maps_details. If found, it extracts the alpha channel, saves it as a new temporary mask map, and adds a newFileRuleand corresponding details to the context. - Context Interaction: Reads from
context.processed_maps_details, adds a newFileRuletocontext.files_to_process, and adds a new entry tocontext.processed_maps_details.
- Responsibility: If no mask map is explicitly defined for the asset, this stage searches for a suitable source map (e.g., Albedo, Diffuse) with an alpha channel in
-
NormalMapGreenChannelStage(processing/pipeline/stages/normal_map_green_channel.py):- Responsibility: Identifies processed normal maps in
context.processed_maps_details. If the globalinvert_normal_map_green_channel_globallyconfiguration is true, it loads the temporary image data, inverts the green channel, saves a new temporary modified normal map, and updates the corresponding details incontext.processed_maps_details. - Context Interaction: Reads from and updates
context.processed_maps_details.
- Responsibility: Identifies processed normal maps in
Core Item Processing Loop
The PipelineOrchestrator iterates through the context.processing_items list (populated by the PrepareProcessingItemsStage). For each item (either a FileRule for a regular map or a MergeTaskDefinition for a merged map), the following stages are executed sequentially:
-
PrepareProcessingItemsStage(processing/pipeline/stages/prepare_processing_items.py):- Responsibility: (Executed once before the loop) Creates the
context.processing_itemslist by combiningFileRules fromcontext.files_to_processandMergeTaskDefinitions derived from the globalmerged_image_tasksconfiguration. Initializescontext.intermediate_results. - Context Interaction: Populates
context.processing_itemsand initializescontext.intermediate_results.
- Responsibility: (Executed once before the loop) Creates the
-
RegularMapProcessorStage(processing/pipeline/stages/regular_map_processor.py):- Responsibility: (Executed per
FileRuleitem) Loads the image data for a single file, determines its potentially suffixed internal map type, applies in-memory transformations (Gloss-to-Rough, Normal Green Invert), and returns the processed image data and details in aProcessedRegularMapDataobject. - Context Interaction: Reads from the input
FileRuleandConfiguration. Returns aProcessedRegularMapDataobject which is stored incontext.intermediate_results.
- Responsibility: (Executed per
-
MergedTaskProcessorStage(processing/pipeline/stages/merged_task_processor.py):- Responsibility: (Executed per
MergeTaskDefinitionitem) Loads and prepares multiple input images based on the merge task definition (including fallbacks and in-memory transformations), handles dimension mismatches, performs the channel merging operation, and returns the merged image data and details in aProcessedMergedMapDataobject. - Context Interaction: Reads from the input
MergeTaskDefinition,context.workspace_path, andConfiguration. Returns aProcessedMergedMapDataobject which is stored incontext.intermediate_results.
- Responsibility: (Executed per
-
InitialScalingStage(processing/pipeline/stages/initial_scaling.py):- Responsibility: (Executed per item) Applies initial scaling (e.g., Power-of-Two downscaling) to the image data from the previous processing stage based on the
initial_scaling_modeconfiguration. - Context Interaction: Takes a
InitialScalingInput(containing image data and config) and returns anInitialScalingOutputobject, which updates the item's entry incontext.intermediate_results.
- Responsibility: (Executed per item) Applies initial scaling (e.g., Power-of-Two downscaling) to the image data from the previous processing stage based on the
-
SaveVariantsStage(processing/pipeline/stages/save_variants.py):- Responsibility: (Executed per item) Takes the final processed image data (potentially scaled) and configuration, and calls a utility to save the image to temporary files in various resolutions and formats as defined by the configuration.
- Context Interaction: Takes a
SaveVariantsInputobject. Returns aSaveVariantsOutputobject containing details about the saved temporary files. The orchestrator stores these details incontext.processed_maps_detailsfor the item.
Post-Item Stages
These stages are executed sequentially once for each asset after the core item processing loop has finished for all items.
-
OutputOrganizationStage(processing/pipeline/stages/output_organization.py):- Responsibility: Determines the final output paths for all processed maps (including variants) and extra files based on configured patterns. It copies the temporary files generated by the core stages to these final destinations, creating directories as needed and respecting overwrite settings.
- Context Interaction: Reads from
context.processed_maps_details,context.files_to_process(for 'EXTRA' files),context.output_base_path, andConfiguration. Updates entries incontext.processed_maps_detailswith final paths and organization status. Populatescontext.asset_metadata['final_output_files'].
-
MetadataFinalizationAndSaveStage(processing/pipeline/stages/metadata_finalization_save.py):- Responsibility: Finalizes the
context.asset_metadata(setting end time, final status based on flags). It restructures the processed map details for inclusion, determines the save path for the metadata file based on configuration and patterns, serializes the metadata to JSON, and saves themetadata.jsonfile to the final output location. - Context Interaction: Reads from
context.asset_metadata,context.processed_maps_details,context.merged_maps_details,context.output_base_path, andConfiguration. Writes themetadata.jsonfile and updatescontext.asset_metadatawith its final path and status.
- Responsibility: Finalizes the
External Steps
Certain steps are integral to the overall asset processing workflow but are handled outside the PipelineOrchestrator's direct execution loop:
- Workspace Preparation and Cleanup: Handled by the code that invokes
ProcessingEngine.process()(e.g.,main.ProcessingTask,monitor._process_archive_task), typically involving extracting archives and setting up temporary directories. The engine itself manages a sub-temporary directory (engine_temp_dir) for intermediate processing files. - Prediction and Rule Generation: Performed before the
ProcessingEngineis called. This involves analyzing source files and generating theSourceRuleobject with its nestedAssetRules andFileRules, often involving prediction logic (potentially using LLMs). - Optional Blender Script Execution: Can be triggered externally after successful processing to perform tasks like material setup in Blender using the generated output files and metadata.
This staged pipeline provides a modular and extensible architecture for asset processing, with clear separation of concerns for each step. The AssetProcessingContext ensures that data flows consistently between these stages.