17 KiB
Cl# Developer Guide: Processing Pipeline
This document details the step-by-step technical process executed by the asset processing pipeline, which is initiated by the ProcessingEngine class (processing_engine.py) and orchestrated by the PipelineOrchestrator (processing/pipeline/orchestrator.py).
The ProcessingEngine.process() method serves as the main entry point. It initializes a PipelineOrchestrator instance, providing it with the application's Configuration object and predefined lists of pre-item and post-item processing stages. The PipelineOrchestrator.process_source_rule() method then manages the execution of these stages for each asset defined in the input SourceRule.
A crucial component in this architecture is the AssetProcessingContext (processing/pipeline/asset_context.py). An instance of this dataclass is created for each AssetRule being processed. It acts as a stateful container, carrying all relevant data (source files, rules, configuration, intermediate results, metadata) and is passed sequentially through each stage. Each stage can read from and write to the context, allowing data to flow and be modified throughout the pipeline.
The pipeline execution for each asset follows this general flow:
- Pre-Item Stages: A sequence of stages executed once per asset before the core item processing loop. These stages typically perform initial setup, filtering, and asset-level transformations.
- Core Item Processing Loop: The
PipelineOrchestratoriterates through a list of "processing items" (individual files or merge tasks) prepared by a dedicated stage. For each item, a sequence of core processing stages is executed. - Post-Item Stages: A sequence of stages executed once per asset after the core item processing loop is complete. These stages handle final tasks like organizing output files and saving metadata.
Pipeline Stages
The stages are executed in the following order for each asset:
Pre-Item Stages
These stages are executed sequentially once for each asset before the core item processing loop begins.
-
SupplierDeterminationStage(processing/pipeline/stages/supplier_determination.py):- Responsibility: Determines the effective supplier for the asset based on the
SourceRule'ssupplier_override,supplier_identifier, and validation against configured suppliers. - Context Interaction: Sets
context.effective_supplierand may set asupplier_errorflag incontext.status_flags.
- Responsibility: Determines the effective supplier for the asset based on the
-
AssetSkipLogicStage(processing/pipeline/stages/asset_skip_logic.py):- Responsibility: Checks if the entire asset should be skipped based on conditions like a missing/invalid supplier, a "SKIP" status in asset metadata, or if the asset is already processed and overwrite is disabled.
- Context Interaction: Sets the
skip_assetflag andskip_reasonincontext.status_flagsif the asset should be skipped.
-
MetadataInitializationStage(processing/pipeline/stages/metadata_initialization.py):- Responsibility: Initializes the
context.asset_metadatadictionary with base information derived from theAssetRule,SourceRule, andConfiguration. This includes asset name, IDs, source/output paths, timestamps, and initial status. - Context Interaction: Populates
context.asset_metadata. Initializescontext.processed_maps_detailsandcontext.merged_maps_detailsas empty dictionaries (these are used internally by subsequent stages but are not directly part of the finalmetadata.jsonin their original form).
- Responsibility: Initializes the
-
FileRuleFilterStage(processing/pipeline/stages/file_rule_filter.py):- Responsibility: Filters the
FileRuleobjects associated with the asset to determine which individual files should be considered for processing. It identifies and excludes files matching "FILE_IGNORE" rules based on theiritem_type. - Context Interaction: Populates
context.files_to_processwith the list ofFileRuleobjects that are not ignored.
- Responsibility: Filters the
-
GlossToRoughConversionStage(processing/pipeline/stages/gloss_to_rough_conversion.py):- Responsibility: Identifies processed maps in
context.processed_maps_detailswhoseinternal_map_typestarts with "MAP_GLOSS". If found, it loads the temporary image data, inverts it using the shared utility functionapply_common_map_transformations, saves a new temporary roughness map ("MAP_ROUGH"), and updates the corresponding details incontext.processed_maps_details(settinginternal_map_typeto "MAP_ROUGH") and the relevantFileRuleincontext.files_to_process(settingitem_typeto "MAP_ROUGH"). - Context Interaction: Reads from and updates
context.processed_maps_details(specificallyinternal_map_typeandtemp_processed_file) andcontext.files_to_process(specificallyitem_type).
- Responsibility: Identifies processed maps in
-
AlphaExtractionToMaskStage(processing/pipeline/stages/alpha_extraction_to_mask.py):- Responsibility: If no mask map is explicitly defined for the asset (as a
FileRulewithitem_type="MAP_MASK"), this stage searchescontext.processed_maps_detailsfor a suitable source map (e.g., a "MAP_COL" with an alpha channel, based on itsinternal_map_type). If found, it extracts the alpha channel, saves it as a new temporary mask map, and adds a newFileRule(withitem_type="MAP_MASK") and corresponding details (withinternal_map_type="MAP_MASK") to the context. - Context Interaction: Reads from
context.processed_maps_details, adds a newFileRuletocontext.files_to_process, and adds a new entry tocontext.processed_maps_details(settinginternal_map_type).
- Responsibility: If no mask map is explicitly defined for the asset (as a
-
NormalMapGreenChannelStage(processing/pipeline/stages/normal_map_green_channel.py):- Responsibility: Identifies processed normal maps in
context.processed_maps_details(those with aninternal_map_typestarting with "MAP_NRM"). If the globalinvert_normal_map_green_channel_globallyconfiguration is true, it loads the temporary image data, inverts the green channel using the shared utility functionapply_common_map_transformations, saves a new temporary modified normal map, and updates thetemp_processed_filepath incontext.processed_maps_details. - Context Interaction: Reads from and updates
context.processed_maps_details(specificallytemp_processed_fileandnotes).
- Responsibility: Identifies processed normal maps in
Core Item Processing Loop
The PipelineOrchestrator iterates through the context.processing_items list (populated by the PrepareProcessingItemsStage). Each item in this list is now either a ProcessingItem (representing a specific variant of a source map, e.g., Color at 1K, or Color at LOWRES) or a MergeTaskDefinition.
PrepareProcessingItemsStage(processing/pipeline/stages/prepare_processing_items.py):- Responsibility: (Executed once before the loop) This stage is now responsible for "exploding" each relevant
FileRuleinto one or moreProcessingItemobjects.- For each
FileRulethat represents an image map:- It loads the source image data and determines its original dimensions and bit depth.
- It creates standard
ProcessingItems for each required output resolution (e.g., "1K", "PREVIEW"), populating them with a copy of the source image data and the respectiveresolution_key. - If the "Low-Resolution Fallback" feature is enabled (
ENABLE_LOW_RESOLUTION_FALLBACKin config) and the source image's largest dimension is belowLOW_RESOLUTION_THRESHOLD, it creates an additionalProcessingItemwithresolution_key="LOWRES", using the original image data and dimensions.
- It also adds
MergeTaskDefinitions derived from globalmap_merge_rules.
- For each
- Context Interaction: Reads
context.files_to_processandcontext.config_obj. Populatescontext.processing_itemswith a list ofProcessingItemandMergeTaskDefinitionobjects. Initializescontext.intermediate_results.
- Responsibility: (Executed once before the loop) This stage is now responsible for "exploding" each relevant
For each item in context.processing_items:
-
Transformations (Implicit or via a dedicated stage - formerly
RegularMapProcessorStagelogic):- Responsibility: If the
itemis aProcessingItem, itsimage_data(loaded byPrepareProcessingItemsStage) may need transformations (Gloss-to-Rough, Normal Green Invert). This logic, previously inRegularMapProcessorStage, might be integrated intoPrepareProcessingItemsStagebeforeProcessingItemcreation, or handled by a new dedicated transformation stage that operates onProcessingItem.image_data. Theitem.map_type_identifierwould be updated if a transformation like Gloss-to-Rough occurs. - Context Interaction: Modifies
item.image_dataanditem.map_type_identifierwithin theProcessingItemobject.
- Responsibility: If the
-
MergedTaskProcessorStage(processing/pipeline/stages/merged_task_processor.py):- Responsibility: (Executed if
itemis aMergeTaskDefinition) Same as before: validates inputs, loads source map data (likely fromProcessingItems incontext.processing_itemsor a cache populated from them), applies transformations, merges channels, and returnsProcessedMergedMapData. - Context Interaction: Reads
MergeTaskDefinition, potentiallycontext.processing_items(or a cache derived from it) for input image data. ReturnsProcessedMergedMapData.
- Responsibility: (Executed if
-
InitialScalingStage(processing/pipeline/stages/initial_scaling.py):- Responsibility: (Executed per item)
- If
itemis aProcessingItem: Takesitem.image_data,item.current_dimensions, anditem.resolution_keyas input. Ifitem.resolution_keyis "LOWRES", POT scaling is skipped. Otherwise, applies POT scaling if configured. - If
itemis from aMergeTaskDefinition(i.e.,processed_datafromMergedTaskProcessorStage): Applies POT scaling as before.
- If
- Context Interaction: Takes
InitialScalingInput(now includingresolution_key). ReturnsInitialScalingOutput(also includingresolution_key), which updatescontext.intermediate_results. Thecurrent_image_dataandcurrent_dimensionsfor saving are taken from this output.
- Responsibility: (Executed per item)
-
SaveVariantsStage(processing/pipeline/stages/save_variants.py):- Responsibility: (Executed per item) Saves the (potentially scaled)
current_image_data. - Context Interaction:
- Takes
SaveVariantsInput. internal_map_typeis set fromitem.map_type_identifier(forProcessingItem) orprocessed_data.output_map_type(for merged).output_filename_pattern_tokens['resolution']is set to theresolution_keyobtained fromscaled_data_output.resolution_key(which originates fromitem.resolution_keyforProcessingItems, or isNonefor merged items that get all standard resolutions).image_resolutionsargument forSaveVariantsInput:- If
resolution_key == "LOWRES": Set to{"LOWRES": width_of_lowres_data}. - If
resolution_keyis a standard key (e.g., "1K"): Set to{resolution_key: configured_dimension}. - For merged items (where
resolution_keyfrom scaling is likelyNone): Set to the fullconfig.image_resolutionsmap to generate all applicable standard sizes.
- If
- Returns
SaveVariantsOutput. Orchestrator stores details incontext.processed_maps_details.
- Takes
- Responsibility: (Executed per item) Saves the (potentially scaled)
Post-Item Stages
These stages are executed sequentially once for each asset after the core item processing loop has finished for all items.
-
OutputOrganizationStage(processing/pipeline/stages/output_organization.py):- Responsibility: Determines the final output paths for all processed maps (including variants) and extra files based on configured patterns. It copies the temporary files generated by the core stages to these final destinations, creating directories as needed and respecting overwrite settings.
- Context Interaction: Reads from
context.processed_maps_details,context.files_to_process(for 'EXTRA' files),context.output_base_path, andConfiguration. Updates entries incontext.processed_maps_detailswith organization status. Populatescontext.asset_metadata['maps']with the final map structure:- The
mapsobject is a dictionary where keys are standard map types (e.g., "COL", "REFL"). - Each entry contains a
variant_pathsdictionary, where keys are resolution strings (e.g., "8K", "4K") and values are the filenames of the map variants (relative to the asset's output directory). It also populatescontext.asset_metadata['final_output_files']with a list of absolute paths to all generated files (this list itself is not saved in the finalmetadata.json).
- The
-
MetadataFinalizationAndSaveStage(processing/pipeline/stages/metadata_finalization_save.py):- Responsibility: Finalizes the
context.asset_metadata(setting final status based on flags). It determines the save path for the metadata file based on configuration and patterns, serializes thecontext.asset_metadata(which now contains the structuredmapsdata fromOutputOrganizationStage) to JSON, and saves themetadata.jsonfile. - Context Interaction: Reads from
context.asset_metadata(including themapsstructure),context.output_base_path, andConfiguration. Before saving, it explicitly removes thefinal_output_fileskey fromcontext.asset_metadata. Theprocessing_end_timeis also no longer added. Themetadata.jsonfile is written, andcontext.asset_metadatais updated with its final path and status. The olderprocessed_maps_detailsandmerged_maps_detailsfrom the context are not directly included in the JSON.
- Responsibility: Finalizes the
External Steps
Certain steps are integral to the overall asset processing workflow but are handled outside the PipelineOrchestrator's direct execution loop:
- Workspace Preparation and Cleanup: Handled by the code that invokes
ProcessingEngine.process()(e.g.,main.ProcessingTask,monitor._process_archive_task), typically involving extracting archives and setting up temporary directories. The engine itself manages a sub-temporary directory (engine_temp_dir) for intermediate processing files. - Prediction and Rule Generation: Performed before the
ProcessingEngineis called. This involves analyzing source files and generating theSourceRuleobject with its nestedAssetRules andFileRules, often involving prediction logic (potentially using LLMs). - Optional Blender Script Execution: Can be triggered externally after successful processing to perform tasks like material setup in Blender using the generated output files and metadata.
This staged pipeline provides a modular and extensible architecture for asset processing, with clear separation of concerns for each step. The AssetProcessingContext ensures that data flows consistently between these stages.