# --- Imports --- import logging import shutil import tempfile from pathlib import Path from typing import List, Dict, Optional, Any, Union # Added Any, Union import numpy as np # Added numpy from configuration import Configuration from rule_structure import SourceRule, AssetRule, FileRule, ProcessingItem # Added ProcessingItem # Import new context classes and stages from .asset_context import ( AssetProcessingContext, MergeTaskDefinition, ProcessedRegularMapData, ProcessedMergedMapData, InitialScalingInput, InitialScalingOutput, SaveVariantsInput, SaveVariantsOutput, ) from .stages.base_stage import ProcessingStage # Import the new stages we created from .stages.prepare_processing_items import PrepareProcessingItemsStage from .stages.regular_map_processor import RegularMapProcessorStage from .stages.merged_task_processor import MergedTaskProcessorStage from .stages.initial_scaling import InitialScalingStage from .stages.save_variants import SaveVariantsStage log = logging.getLogger(__name__) # --- PipelineOrchestrator Class --- class PipelineOrchestrator: """ Orchestrates the processing of assets based on source rules and a series of processing stages. Manages the overall flow, including the core item processing sequence. """ def __init__(self, config_obj: Configuration, pre_item_stages: List[ProcessingStage], post_item_stages: List[ProcessingStage]): """ Initializes the PipelineOrchestrator. Args: config_obj: The main configuration object. pre_item_stages: Stages to run before the core item processing loop. post_item_stages: Stages to run after the core item processing loop. """ self.config_obj: Configuration = config_obj self.pre_item_stages: List[ProcessingStage] = pre_item_stages self.post_item_stages: List[ProcessingStage] = post_item_stages # Instantiate the core item processing stages internally self._prepare_stage = PrepareProcessingItemsStage() self._regular_processor_stage = RegularMapProcessorStage() self._merged_processor_stage = MergedTaskProcessorStage() self._scaling_stage = InitialScalingStage() self._save_stage = SaveVariantsStage() def _execute_specific_stages( self, context: AssetProcessingContext, stages_to_run: List[ProcessingStage], stage_group_name: str, stop_on_skip: bool = True ) -> AssetProcessingContext: """Executes a specific list of stages.""" asset_name = context.asset_rule.asset_name if context.asset_rule else "Unknown" log.debug(f"Asset '{asset_name}': Executing {stage_group_name} stages...") for stage in stages_to_run: stage_name = stage.__class__.__name__ log.debug(f"Asset '{asset_name}': Executing {stage_group_name} stage: {stage_name}") try: # Check if stage expects context directly or specific input # For now, assume outer stages take context directly # This might need refinement if outer stages also adopt Input/Output pattern context = stage.execute(context) except Exception as e: log.error(f"Asset '{asset_name}': Error during outer stage '{stage_name}': {e}", exc_info=True) context.status_flags["asset_failed"] = True context.status_flags["asset_failed_stage"] = stage_name context.status_flags["asset_failed_reason"] = str(e) # Update overall metadata immediately on outer stage failure context.asset_metadata["status"] = f"Failed: Error in stage {stage_name}" context.asset_metadata["error_message"] = str(e) break # Stop processing outer stages for this asset on error if stop_on_skip and context.status_flags.get("skip_asset"): log.info(f"Asset '{asset_name}': Skipped by outer stage '{stage_name}'. Reason: {context.status_flags.get('skip_reason', 'N/A')}") break # Skip remaining outer stages for this asset return context def process_source_rule( self, source_rule: SourceRule, workspace_path: Path, output_base_path: Path, overwrite: bool, incrementing_value: Optional[str], sha5_value: Optional[str] # Keep param name consistent for now ) -> Dict[str, List[str]]: """ Processes a single source rule, applying pre-processing stages, the core item processing loop (Prepare, Process, Scale, Save), and post-processing stages. """ overall_status: Dict[str, List[str]] = { "processed": [], "skipped": [], "failed": [], } engine_temp_dir_path: Optional[Path] = None try: # --- Setup Temporary Directory --- temp_dir_path_str = tempfile.mkdtemp(prefix=self.config_obj.temp_dir_prefix) engine_temp_dir_path = Path(temp_dir_path_str) log.debug(f"PipelineOrchestrator created temporary directory: {engine_temp_dir_path}") # --- Process Each Asset Rule --- for asset_rule in source_rule.assets: asset_name = asset_rule.asset_name log.info(f"Orchestrator: Processing asset '{asset_name}'") # --- Initialize Asset Context --- context = AssetProcessingContext( source_rule=source_rule, asset_rule=asset_rule, workspace_path=workspace_path, engine_temp_dir=engine_temp_dir_path, output_base_path=output_base_path, effective_supplier=None, asset_metadata={}, processed_maps_details={}, # Final results per item merged_maps_details={}, # Keep for potential backward compat or other uses? files_to_process=[], # Populated by FileRuleFilterStage (assumed in outer_stages) loaded_data_cache={}, config_obj=self.config_obj, status_flags={"skip_asset": False, "asset_failed": False}, incrementing_value=incrementing_value, sha5_value=sha5_value, processing_items=[], # Initialize new fields intermediate_results={} ) # --- Execute Pre-Item-Processing Outer Stages --- # (e.g., MetadataInit, SupplierDet, FileRuleFilter, GlossToRough, NormalInvert) # Identify which outer stages run before the item loop # This requires knowing the intended order. Assume all run before for now. context = self._execute_specific_stages(context, self.pre_item_stages, "pre-item", stop_on_skip=True) # Check if asset should be skipped or failed after pre-processing if context.status_flags.get("asset_failed"): log.error(f"Asset '{asset_name}': Failed during pre-processing stage '{context.status_flags.get('asset_failed_stage', 'Unknown')}'. Skipping item processing.") overall_status["failed"].append(f"{asset_name} (Failed in {context.status_flags.get('asset_failed_stage', 'Pre-Processing')})") continue # Move to the next asset rule if context.status_flags.get("skip_asset"): log.info(f"Asset '{asset_name}': Skipped during pre-processing. Skipping item processing.") overall_status["skipped"].append(asset_name) continue # Move to the next asset rule # --- Prepare Processing Items --- log.debug(f"Asset '{asset_name}': Preparing processing items...") try: log.info(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': Attempting to call _prepare_stage.execute(). Current context.status_flags: {context.status_flags}") # Prepare stage modifies context directly context = self._prepare_stage.execute(context) log.info(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': Successfully RETURNED from _prepare_stage.execute(). context.processing_items count: {len(context.processing_items) if context.processing_items is not None else 'None'}. context.status_flags: {context.status_flags}") except Exception as e: log.error(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': EXCEPTION during _prepare_stage.execute(): {e}", exc_info=True) context.status_flags["asset_failed"] = True context.status_flags["asset_failed_stage"] = "PrepareProcessingItemsStage" context.status_flags["asset_failed_reason"] = str(e) overall_status["failed"].append(f"{asset_name} (Failed in Prepare Items)") continue # Move to next asset if context.status_flags.get('prepare_items_failed'): log.error(f"Asset '{asset_name}': Failed during item preparation. Reason: {context.status_flags.get('prepare_items_failed_reason', 'Unknown')}. Skipping item processing loop.") overall_status["failed"].append(f"{asset_name} (Failed Prepare Items: {context.status_flags.get('prepare_items_failed_reason', 'Unknown')})") continue # Move to next asset if not context.processing_items: log.info(f"Asset '{asset_name}': No items to process after preparation stage.") # Status will be determined at the end # --- Core Item Processing Loop --- log.info("ORCHESTRATOR: Starting processing items loop for asset '%s'", asset_name) # Corrected indentation and message log.info(f"Asset '{asset_name}': Starting core item processing loop for {len(context.processing_items)} items...") asset_had_item_errors = False for item_index, item in enumerate(context.processing_items): item_key: Any = None # Key for storing results (FileRule object or task_key string) item_log_prefix = f"Asset '{asset_name}', Item {item_index + 1}/{len(context.processing_items)}" processed_data: Optional[Union[ProcessedRegularMapData, ProcessedMergedMapData]] = None scaled_data_output: Optional[InitialScalingOutput] = None # Store output object saved_data: Optional[SaveVariantsOutput] = None item_status = "Failed" # Default item status current_image_data: Optional[np.ndarray] = None # Track current image data ref try: # The 'item' is now expected to be a ProcessingItem or MergeTaskDefinition if isinstance(item, ProcessingItem): item_key = f"{item.source_file_info_ref}_{item.map_type_identifier}_{item.resolution_key}" item_log_prefix = f"Asset '{asset_name}', ProcItem '{item_key}'" log.info(f"{item_log_prefix}: Starting processing.") # Data for ProcessingItem is already loaded by PrepareProcessingItemsStage current_image_data = item.image_data current_dimensions = item.current_dimensions item_resolution_key = item.resolution_key # Transformations (like gloss to rough, normal invert) are assumed to be applied # by RegularMapProcessorStage if it's still used, or directly in PrepareProcessingItemsStage # before creating the ProcessingItem, or a new dedicated transformation stage. # For now, assume item.image_data is ready for scaling/saving. # Store initial ProcessingItem data as "processed_data" for consistency if RegularMapProcessor is bypassed # This is a simplification; a dedicated transformation stage would be cleaner. # For now, we assume transformations happened before or within PrepareProcessingItemsStage. # The 'processed_data' variable here is more of a placeholder for what would feed into scaling. # Create a simple ProcessedRegularMapData-like structure for logging/details if needed, # or adapt the final_details population later. # For now, we'll directly use 'item' fields. # 2. Scale (Optional) scaling_mode = getattr(context.config_obj, "INITIAL_SCALING_MODE", "NONE") # Pass the item's resolution_key to InitialScalingInput scale_input = InitialScalingInput( image_data=current_image_data, original_dimensions=current_dimensions, initial_scaling_mode=scaling_mode, resolution_key=item_resolution_key # Pass the key ) # Add _source_file_path for logging within InitialScalingStage if available setattr(scale_input, '_source_file_path', item.source_file_info_ref) log.debug(f"{item_log_prefix}: Calling InitialScalingStage. Input res_key: {scale_input.resolution_key}") scaled_data_output = self._scaling_stage.execute(scale_input) current_image_data = scaled_data_output.scaled_image_data current_dimensions = scaled_data_output.final_dimensions # Dimensions after scaling # The resolution_key from item is passed through by InitialScalingOutput output_resolution_key = scaled_data_output.resolution_key log.debug(f"{item_log_prefix}: InitialScalingStage output. Scaled: {scaled_data_output.scaling_applied}, New Dims: {current_dimensions}, Output ResKey: {output_resolution_key}") context.intermediate_results[item_key] = scaled_data_output # 3. Save Variants if current_image_data is None or current_image_data.size == 0: log.warning(f"{item_log_prefix}: Skipping save stage because image data is empty.") context.processed_maps_details[item_key] = {"status": "Skipped", "notes": "No image data to save", "stage": "SaveVariantsStage"} continue log.debug(f"{item_log_prefix}: Preparing to save variant with resolution key '{output_resolution_key}'...") output_filename_tokens = { 'asset_name': asset_name, 'output_base_directory': context.engine_temp_dir, 'supplier': context.effective_supplier or 'UnknownSupplier', 'resolution': output_resolution_key # Use the key from the item/scaling stage } # Determine image_resolutions argument for save_image_variants save_specific_resolutions = {} if output_resolution_key == "LOWRES": # For LOWRES, the "resolution value" is its actual dimension. # image_saving_utils needs a dict like {"LOWRES": 64} if current_dim is 64x64 # Assuming current_dimensions[0] is width. save_specific_resolutions = {"LOWRES": current_dimensions[0] if current_dimensions else 0} log.debug(f"{item_log_prefix}: Preparing to save LOWRES variant. Dimensions: {current_dimensions}. Save resolutions arg: {save_specific_resolutions}") elif output_resolution_key in context.config_obj.image_resolutions: save_specific_resolutions = {output_resolution_key: context.config_obj.image_resolutions[output_resolution_key]} else: log.warning(f"{item_log_prefix}: Resolution key '{output_resolution_key}' not found in config.image_resolutions and not LOWRES. Saving might fail or use full res.") # Fallback: pass all configured resolutions, image_saving_utils will try to match by size. # This might not be ideal if the key is truly unknown. # Or, more strictly, fail here if key is unknown and not LOWRES. # For now, let image_saving_utils handle it by passing all. save_specific_resolutions = context.config_obj.image_resolutions save_input = SaveVariantsInput( image_data=current_image_data, final_internal_map_type=item.map_type_identifier, source_bit_depth_info=[item.bit_depth] if item.bit_depth is not None else [8], # Default to 8 if not set output_filename_pattern_tokens=output_filename_tokens, image_resolutions=save_specific_resolutions, # Pass the specific resolution(s) file_type_defs=context.config_obj.get_file_type_definitions_with_examples(), output_format_8bit=context.config_obj.get_8bit_output_format(), output_format_16bit_primary=context.config_obj.get_16bit_output_formats()[0], output_format_16bit_fallback=context.config_obj.get_16bit_output_formats()[1], png_compression_level=context.config_obj.png_compression_level, jpg_quality=context.config_obj.jpg_quality, output_filename_pattern=context.config_obj.output_filename_pattern, resolution_threshold_for_jpg=getattr(context.config_obj, "resolution_threshold_for_jpg", None) ) saved_data = self._save_stage.execute(save_input) if saved_data and saved_data.status.startswith("Processed"): item_status = saved_data.status log.info(f"{item_log_prefix}: Item successfully processed and saved. Status: {item_status}") context.processed_maps_details[item_key] = { "status": item_status, "saved_files_info": saved_data.saved_files_details, "internal_map_type": item.map_type_identifier, "resolution_key": output_resolution_key, "original_dimensions": item.original_dimensions, "final_dimensions": current_dimensions, # Dimensions after scaling "source_file": item.source_file_info_ref, } else: error_msg = saved_data.error_message if saved_data else "Save stage returned None" log.error(f"{item_log_prefix}: Failed during save stage. Error: {error_msg}") context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Save Error: {error_msg}", "stage": "SaveVariantsStage"} asset_had_item_errors = True item_status = "Failed" elif isinstance(item, MergeTaskDefinition): # --- This part needs similar refactoring for resolution_key if merged outputs can be LOWRES --- # --- For now, assume merged tasks always produce standard resolutions --- item_key = item.task_key item_log_prefix = f"Asset '{asset_name}', MergeTask '{item_key}'" log.info(f"{item_log_prefix}: Processing MergeTask.") # 1. Process Merge Task processed_data = self._merged_processor_stage.execute(context, item) if not processed_data or processed_data.status != "Processed": error_msg = processed_data.error_message if processed_data else "Merge processor returned None" log.error(f"{item_log_prefix}: Failed during merge processing. Error: {error_msg}") context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Merge Error: {error_msg}", "stage": "MergedTaskProcessorStage"} asset_had_item_errors = True continue context.intermediate_results[item_key] = processed_data current_image_data = processed_data.merged_image_data current_dimensions = processed_data.final_dimensions # 2. Scale Merged Output (Optional) # Merged tasks typically don't have a single "resolution_key" like LOWRES from source. # They produce an image that then gets downscaled to 1K, PREVIEW etc. # So, resolution_key for InitialScalingInput here would be None or a default. scaling_mode = getattr(context.config_obj, "INITIAL_SCALING_MODE", "NONE") scale_input = InitialScalingInput( image_data=current_image_data, original_dimensions=current_dimensions, initial_scaling_mode=scaling_mode, resolution_key=None # Merged outputs are not "LOWRES" themselves before this scaling ) setattr(scale_input, '_source_file_path', f"MergeTask_{item_key}") # For logging log.debug(f"{item_log_prefix}: Calling InitialScalingStage for merged data.") scaled_data_output = self._scaling_stage.execute(scale_input) current_image_data = scaled_data_output.scaled_image_data current_dimensions = scaled_data_output.final_dimensions # Merged items don't have a specific output_resolution_key from source, # they will be saved to all applicable resolutions from config. # So scaled_data_output.resolution_key will be None here. context.intermediate_results[item_key] = scaled_data_output # 3. Save Merged Variants if current_image_data is None or current_image_data.size == 0: log.warning(f"{item_log_prefix}: Skipping save for merged task, image data is empty.") context.processed_maps_details[item_key] = {"status": "Skipped", "notes": "No merged image data to save", "stage": "SaveVariantsStage"} continue output_filename_tokens = { 'asset_name': asset_name, 'output_base_directory': context.engine_temp_dir, 'supplier': context.effective_supplier or 'UnknownSupplier', # 'resolution' token will be filled by image_saving_utils for each variant } # For merged tasks, we usually want to generate all standard resolutions. # The `resolution_key` from the item itself is not applicable here for the `resolution` token. # The `image_saving_utils.save_image_variants` will iterate through `context.config_obj.image_resolutions`. save_input = SaveVariantsInput( image_data=current_image_data, final_internal_map_type=processed_data.output_map_type, source_bit_depth_info=processed_data.source_bit_depths, output_filename_pattern_tokens=output_filename_tokens, image_resolutions=context.config_obj.image_resolutions, # Pass all configured resolutions file_type_defs=getattr(context.config_obj, "FILE_TYPE_DEFINITIONS", {}), output_format_8bit=context.config_obj.get_8bit_output_format(), output_format_16bit_primary=context.config_obj.get_16bit_output_formats()[0], output_format_16bit_fallback=context.config_obj.get_16bit_output_formats()[1], png_compression_level=context.config_obj.png_compression_level, jpg_quality=context.config_obj.jpg_quality, output_filename_pattern=context.config_obj.output_filename_pattern, resolution_threshold_for_jpg=getattr(context.config_obj, "resolution_threshold_for_jpg", None) ) saved_data = self._save_stage.execute(save_input) if saved_data and saved_data.status.startswith("Processed"): item_status = saved_data.status log.info(f"{item_log_prefix}: Merged task successfully processed and saved. Status: {item_status}") context.processed_maps_details[item_key] = { "status": item_status, "saved_files_info": saved_data.saved_files_details, "internal_map_type": processed_data.output_map_type, "final_dimensions": current_dimensions, } else: error_msg = saved_data.error_message if saved_data else "Save stage for merged task returned None" log.error(f"{item_log_prefix}: Failed during save stage for merged task. Error: {error_msg}") context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Save Error (Merged): {error_msg}", "stage": "SaveVariantsStage"} asset_had_item_errors = True item_status = "Failed" else: log.warning(f"{item_log_prefix}: Unknown item type in loop: {type(item)}. Skipping.") # Ensure some key exists to prevent KeyError if item_key was not set unknown_item_key = f"unknown_item_at_index_{item_index}" context.processed_maps_details[unknown_item_key] = {"status": "Skipped", "notes": f"Unknown item type {type(item)}"} asset_had_item_errors = True continue except Exception as e: log.exception(f"Asset '{asset_name}', Item Loop Index {item_index}: Unhandled exception: {e}") # Ensure details are recorded even on unhandled exception if item_key is not None: context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Unhandled Loop Error: {e}", "stage": "OrchestratorLoop"} else: log.error(f"Asset '{asset_name}': Unhandled exception in item loop before item key was set.") asset_had_item_errors = True item_status = "Failed" # Optionally break loop or continue? Continue for now to process other items. log.info("ORCHESTRATOR: Finished processing items loop for asset '%s'", asset_name) log.info(f"Asset '{asset_name}': Finished core item processing loop.") # --- Execute Post-Item-Processing Outer Stages --- # (e.g., OutputOrganization, MetadataFinalizationSave) # Identify which outer stages run after the item loop # This needs better handling based on stage purpose. Assume none run after for now. if not context.status_flags.get("asset_failed"): log.info("ORCHESTRATOR: Executing post-item-processing outer stages for asset '%s'", asset_name) context = self._execute_specific_stages(context, self.post_item_stages, "post-item", stop_on_skip=False) # --- Final Asset Status Determination --- final_asset_status = "Unknown" fail_reason = "" if context.status_flags.get("asset_failed"): final_asset_status = "Failed" fail_reason = f"(Failed in {context.status_flags.get('asset_failed_stage', 'Unknown Stage')}: {context.status_flags.get('asset_failed_reason', 'Unknown Reason')})" elif context.status_flags.get("skip_asset"): final_asset_status = "Skipped" fail_reason = f"(Skipped: {context.status_flags.get('skip_reason', 'Unknown Reason')})" elif asset_had_item_errors: final_asset_status = "Failed" fail_reason = "(One or more items failed)" elif not context.processing_items: # No items prepared, no errors -> consider skipped or processed based on definition? final_asset_status = "Skipped" # Or "Processed (No Items)" fail_reason = "(No items to process)" elif not context.processed_maps_details and context.processing_items: # Items were prepared, but none resulted in processed_maps_details entry final_asset_status = "Skipped" # Or Failed? fail_reason = "(All processing items skipped or failed internally)" elif context.processed_maps_details: # Check if all items in processed_maps_details are actually processed successfully all_processed_ok = all( str(details.get("status", "")).startswith("Processed") for details in context.processed_maps_details.values() ) some_processed_ok = any( str(details.get("status", "")).startswith("Processed") for details in context.processed_maps_details.values() ) if all_processed_ok: final_asset_status = "Processed" elif some_processed_ok: final_asset_status = "Partial" # Introduce a partial status? Or just Failed? fail_reason = "(Some items failed)" final_asset_status = "Failed" # Treat partial as Failed for overall status else: # No items processed successfully final_asset_status = "Failed" fail_reason = "(All items failed)" else: # Should not happen if processing_items existed final_asset_status = "Failed" fail_reason = "(Unknown state after item processing)" # Update overall status list if final_asset_status == "Processed": overall_status["processed"].append(asset_name) elif final_asset_status == "Skipped": overall_status["skipped"].append(f"{asset_name} {fail_reason}") else: # Failed or Unknown overall_status["failed"].append(f"{asset_name} {fail_reason}") log.info(f"Asset '{asset_name}' final status: {final_asset_status} {fail_reason}") # Clean up intermediate results for the asset to save memory context.intermediate_results = {} except Exception as e: log.error(f"PipelineOrchestrator.process_source_rule failed critically: {e}", exc_info=True) # Mark all assets from this source rule that weren't finished as failed processed_or_skipped_or_failed = set(overall_status["processed"]) | \ set(name.split(" ")[0] for name in overall_status["skipped"]) | \ set(name.split(" ")[0] for name in overall_status["failed"]) for asset_rule in source_rule.assets: if asset_rule.asset_name not in processed_or_skipped_or_failed: overall_status["failed"].append(f"{asset_rule.asset_name} (Orchestrator Error: {e})") finally: # --- Cleanup Temporary Directory --- if engine_temp_dir_path and engine_temp_dir_path.exists(): try: log.debug(f"PipelineOrchestrator cleaning up temporary directory: {engine_temp_dir_path}") shutil.rmtree(engine_temp_dir_path, ignore_errors=True) except Exception as e: log.error(f"Error cleaning up orchestrator temporary directory {engine_temp_dir_path}: {e}", exc_info=True) return overall_status