2025-05-13 04:01:38 +02:00

16 KiB

Cl# Developer Guide: Processing Pipeline

This document details the step-by-step technical process executed by the asset processing pipeline, which is initiated by the ProcessingEngine class (processing_engine.py) and orchestrated by the PipelineOrchestrator (processing/pipeline/orchestrator.py).

The ProcessingEngine.process() method serves as the main entry point. It initializes a PipelineOrchestrator instance, providing it with the application's Configuration object and predefined lists of pre-item and post-item processing stages. The PipelineOrchestrator.process_source_rule() method then manages the execution of these stages for each asset defined in the input SourceRule.

A crucial component in this architecture is the AssetProcessingContext (processing/pipeline/asset_context.py). An instance of this dataclass is created for each AssetRule being processed. It acts as a stateful container, carrying all relevant data (source files, rules, configuration, intermediate results, metadata) and is passed sequentially through each stage. Each stage can read from and write to the context, allowing data to flow and be modified throughout the pipeline.

The pipeline execution for each asset follows this general flow:

  1. Pre-Item Stages: A sequence of stages executed once per asset before the core item processing loop. These stages typically perform initial setup, filtering, and asset-level transformations.
  2. Core Item Processing Loop: The PipelineOrchestrator iterates through a list of "processing items" (individual files or merge tasks) prepared by a dedicated stage. For each item, a sequence of core processing stages is executed.
  3. Post-Item Stages: A sequence of stages executed once per asset after the core item processing loop is complete. These stages handle final tasks like organizing output files and saving metadata.

Pipeline Stages

The stages are executed in the following order for each asset:

Pre-Item Stages

These stages are executed sequentially once for each asset before the core item processing loop begins.

  1. SupplierDeterminationStage (processing/pipeline/stages/supplier_determination.py):

    • Responsibility: Determines the effective supplier for the asset based on the SourceRule's supplier_override, supplier_identifier, and validation against configured suppliers.
    • Context Interaction: Sets context.effective_supplier and may set a supplier_error flag in context.status_flags.
  2. AssetSkipLogicStage (processing/pipeline/stages/asset_skip_logic.py):

    • Responsibility: Checks if the entire asset should be skipped based on conditions like a missing/invalid supplier, a "SKIP" status in asset metadata, or if the asset is already processed and overwrite is disabled.
    • Context Interaction: Sets the skip_asset flag and skip_reason in context.status_flags if the asset should be skipped.
  3. MetadataInitializationStage (processing/pipeline/stages/metadata_initialization.py):

    • Responsibility: Initializes the context.asset_metadata dictionary with base information derived from the AssetRule, SourceRule, and Configuration. This includes asset name, IDs, source/output paths, timestamps, and initial status.
    • Context Interaction: Populates context.asset_metadata and initializes empty dictionaries for processed_maps_details and merged_maps_details.
  4. FileRuleFilterStage (processing/pipeline/stages/file_rule_filter.py):

    • Responsibility: Filters the FileRule objects associated with the asset to determine which individual files should be considered for processing. It identifies and excludes files matching "FILE_IGNORE" rules based on their item_type.
    • Context Interaction: Populates context.files_to_process with the list of FileRule objects that are not ignored.
  5. GlossToRoughConversionStage (processing/pipeline/stages/gloss_to_rough_conversion.py):

    • Responsibility: Identifies processed maps in context.processed_maps_details whose internal_map_type starts with "MAP_GLOSS". If found, it loads the temporary image data, inverts it using the shared utility function apply_common_map_transformations, saves a new temporary roughness map ("MAP_ROUGH"), and updates the corresponding details in context.processed_maps_details (setting internal_map_type to "MAP_ROUGH") and the relevant FileRule in context.files_to_process (setting item_type to "MAP_ROUGH").
    • Context Interaction: Reads from and updates context.processed_maps_details (specifically internal_map_type and temp_processed_file) and context.files_to_process (specifically item_type).
  6. AlphaExtractionToMaskStage (processing/pipeline/stages/alpha_extraction_to_mask.py):

    • Responsibility: If no mask map is explicitly defined for the asset (as a FileRule with item_type="MAP_MASK"), this stage searches context.processed_maps_details for a suitable source map (e.g., a "MAP_COL" with an alpha channel, based on its internal_map_type). If found, it extracts the alpha channel, saves it as a new temporary mask map, and adds a new FileRule (with item_type="MAP_MASK") and corresponding details (with internal_map_type="MAP_MASK") to the context.
    • Context Interaction: Reads from context.processed_maps_details, adds a new FileRule to context.files_to_process, and adds a new entry to context.processed_maps_details (setting internal_map_type).
  7. NormalMapGreenChannelStage (processing/pipeline/stages/normal_map_green_channel.py):

    • Responsibility: Identifies processed normal maps in context.processed_maps_details (those with an internal_map_type starting with "MAP_NRM"). If the global invert_normal_map_green_channel_globally configuration is true, it loads the temporary image data, inverts the green channel using the shared utility function apply_common_map_transformations, saves a new temporary modified normal map, and updates the temp_processed_file path in context.processed_maps_details.
    • Context Interaction: Reads from and updates context.processed_maps_details (specifically temp_processed_file and notes).

Core Item Processing Loop

The PipelineOrchestrator iterates through the context.processing_items list (populated by the PrepareProcessingItemsStage). For each item (either a FileRule for a regular map or a MergeTaskDefinition for a merged map), the following stages are executed sequentially:

  1. PrepareProcessingItemsStage (processing/pipeline/stages/prepare_processing_items.py):

    • Responsibility: (Executed once before the loop) Creates the context.processing_items list by combining FileRules from context.files_to_process and MergeTaskDefinitions derived from the global map_merge_rules configuration. It correctly accesses map_merge_rules from context.config_obj and validates each merge rule for the presence of output_map_type and a dictionary for inputs. Initializes context.intermediate_results.
    • Context Interaction: Reads from context.files_to_process and context.config_obj (accessing map_merge_rules). Populates context.processing_items and initializes context.intermediate_results.
  2. RegularMapProcessorStage (processing/pipeline/stages/regular_map_processor.py):

    • Responsibility: (Executed per FileRule item) Checks if the FileRule.item_type starts with "MAP_". If not, the item is skipped. Otherwise, it loads the image data for the file, determines its potentially suffixed internal map type (e.g., "MAP_COL-1"), applies in-memory transformations (Gloss-to-Rough, Normal Green Invert) using the shared utility function apply_common_map_transformations, and returns the processed image data and details in a ProcessedRegularMapData object. The internal_map_type in the output reflects any transformations (e.g., "MAP_GLOSS" becomes "MAP_ROUGH").
    • Context Interaction: Reads from the input FileRule (checking item_type) and Configuration. Returns a ProcessedRegularMapData object which is stored in context.intermediate_results.
  3. MergedTaskProcessorStage (processing/pipeline/stages/merged_task_processor.py):

    • Responsibility: (Executed per MergeTaskDefinition item) Validates that all input map types specified in the merge rule start with "MAP_". If not, the task is failed. It dynamically loads input images by looking up the required input map types (e.g., "MAP_NRM") in context.processed_maps_details and using the temporary file paths from their saved_files_info. It applies in-memory transformations to inputs using apply_common_map_transformations, handles dimension mismatches (with fallback creation if configured and source_dimensions are available), performs the channel merging operation, and returns the merged image data and details in a ProcessedMergedMapData object. The output_map_type of the merged map must also be "MAP_" prefixed in the configuration.
    • Context Interaction: Reads from the input MergeTaskDefinition (checking input map types), context.workspace_path, context.processed_maps_details (for input image data), and Configuration. Returns a ProcessedMergedMapData object which is stored in context.intermediate_results.
  4. InitialScalingStage (processing/pipeline/stages/initial_scaling.py):

    • Responsibility: (Executed per item) Applies initial scaling (e.g., Power-of-Two downscaling) to the image data from the previous processing stage based on the initial_scaling_mode configuration.
    • Context Interaction: Takes a InitialScalingInput (containing image data and config) and returns an InitialScalingOutput object, which updates the item's entry in context.intermediate_results.
  5. SaveVariantsStage (processing/pipeline/stages/save_variants.py):

    • Responsibility: (Executed per item) Takes the final processed image data (potentially scaled) and configuration, and calls a utility to save the image to temporary files in various resolutions and formats as defined by the configuration.
    • Context Interaction: Takes a SaveVariantsInput object (which includes the "MAP_" prefixed internal_map_type). It uses the get_filename_friendly_map_type utility to convert this to a "standard type" (e.g., "COL") for output naming. Returns a SaveVariantsOutput object containing details about the saved temporary files. The orchestrator stores these details, including the original "MAP_" prefixed internal_map_type, in context.processed_maps_details for the item.

Post-Item Stages

These stages are executed sequentially once for each asset after the core item processing loop has finished for all items.

  1. OutputOrganizationStage (processing/pipeline/stages/output_organization.py):

    • Responsibility: Determines the final output paths for all processed maps (including variants) and extra files based on configured patterns. It copies the temporary files generated by the core stages to these final destinations, creating directories as needed and respecting overwrite settings.
    • Context Interaction: Reads from context.processed_maps_details (using the "MAP_" prefixed internal_map_type to get the "standard type" via get_filename_friendly_map_type for output naming), context.files_to_process (for 'EXTRA' files), context.output_base_path, and Configuration. Updates entries in context.processed_maps_details with final paths and organization status. Populates context.asset_metadata['final_output_files']. (Note: Legacy code for 'Processed_With_Variants' status has been removed from this stage).
  2. MetadataFinalizationAndSaveStage (processing/pipeline/stages/metadata_finalization_save.py):

    • Responsibility: Finalizes the context.asset_metadata (setting end time, final status based on flags). It restructures the processed map details for inclusion, determines the save path for the metadata file based on configuration and patterns, serializes the metadata to JSON, and saves the metadata.json file to the final output location.
    • Context Interaction: Reads from context.asset_metadata, context.processed_maps_details, context.merged_maps_details, context.output_base_path, and Configuration. Writes the metadata.json file and updates context.asset_metadata with its final path and status.

External Steps

Certain steps are integral to the overall asset processing workflow but are handled outside the PipelineOrchestrator's direct execution loop:

  • Workspace Preparation and Cleanup: Handled by the code that invokes ProcessingEngine.process() (e.g., main.ProcessingTask, monitor._process_archive_task), typically involving extracting archives and setting up temporary directories. The engine itself manages a sub-temporary directory (engine_temp_dir) for intermediate processing files.
  • Prediction and Rule Generation: Performed before the ProcessingEngine is called. This involves analyzing source files and generating the SourceRule object with its nested AssetRules and FileRules, often involving prediction logic (potentially using LLMs).
  • Optional Blender Script Execution: Can be triggered externally after successful processing to perform tasks like material setup in Blender using the generated output files and metadata.

This staged pipeline provides a modular and extensible architecture for asset processing, with clear separation of concerns for each step. The AssetProcessingContext ensures that data flows consistently between these stages.