# --- Imports --- import logging import shutil import tempfile from pathlib import Path from typing import List, Dict, Optional, Any, Union # Added Any, Union import numpy as np # Added numpy from configuration import Configuration from rule_structure import SourceRule, AssetRule, FileRule # Added FileRule # Import new context classes and stages from .asset_context import ( AssetProcessingContext, MergeTaskDefinition, ProcessedRegularMapData, ProcessedMergedMapData, InitialScalingInput, InitialScalingOutput, SaveVariantsInput, SaveVariantsOutput, ) from .stages.base_stage import ProcessingStage # Import the new stages we created from .stages.prepare_processing_items import PrepareProcessingItemsStage from .stages.regular_map_processor import RegularMapProcessorStage from .stages.merged_task_processor import MergedTaskProcessorStage from .stages.initial_scaling import InitialScalingStage from .stages.save_variants import SaveVariantsStage log = logging.getLogger(__name__) # --- PipelineOrchestrator Class --- class PipelineOrchestrator: """ Orchestrates the processing of assets based on source rules and a series of processing stages. Manages the overall flow, including the core item processing sequence. """ def __init__(self, config_obj: Configuration, pre_item_stages: List[ProcessingStage], post_item_stages: List[ProcessingStage]): """ Initializes the PipelineOrchestrator. Args: config_obj: The main configuration object. pre_item_stages: Stages to run before the core item processing loop. post_item_stages: Stages to run after the core item processing loop. """ self.config_obj: Configuration = config_obj self.pre_item_stages: List[ProcessingStage] = pre_item_stages self.post_item_stages: List[ProcessingStage] = post_item_stages # Instantiate the core item processing stages internally self._prepare_stage = PrepareProcessingItemsStage() self._regular_processor_stage = RegularMapProcessorStage() self._merged_processor_stage = MergedTaskProcessorStage() self._scaling_stage = InitialScalingStage() self._save_stage = SaveVariantsStage() def _execute_specific_stages( self, context: AssetProcessingContext, stages_to_run: List[ProcessingStage], stage_group_name: str, stop_on_skip: bool = True ) -> AssetProcessingContext: """Executes a specific list of stages.""" asset_name = context.asset_rule.asset_name if context.asset_rule else "Unknown" log.debug(f"Asset '{asset_name}': Executing {stage_group_name} stages...") for stage in stages_to_run: stage_name = stage.__class__.__name__ log.debug(f"Asset '{asset_name}': Executing {stage_group_name} stage: {stage_name}") try: # Check if stage expects context directly or specific input # For now, assume outer stages take context directly # This might need refinement if outer stages also adopt Input/Output pattern context = stage.execute(context) except Exception as e: log.error(f"Asset '{asset_name}': Error during outer stage '{stage_name}': {e}", exc_info=True) context.status_flags["asset_failed"] = True context.status_flags["asset_failed_stage"] = stage_name context.status_flags["asset_failed_reason"] = str(e) # Update overall metadata immediately on outer stage failure context.asset_metadata["status"] = f"Failed: Error in stage {stage_name}" context.asset_metadata["error_message"] = str(e) break # Stop processing outer stages for this asset on error if stop_on_skip and context.status_flags.get("skip_asset"): log.info(f"Asset '{asset_name}': Skipped by outer stage '{stage_name}'. Reason: {context.status_flags.get('skip_reason', 'N/A')}") break # Skip remaining outer stages for this asset return context def process_source_rule( self, source_rule: SourceRule, workspace_path: Path, output_base_path: Path, overwrite: bool, incrementing_value: Optional[str], sha5_value: Optional[str] # Keep param name consistent for now ) -> Dict[str, List[str]]: """ Processes a single source rule, applying pre-processing stages, the core item processing loop (Prepare, Process, Scale, Save), and post-processing stages. """ overall_status: Dict[str, List[str]] = { "processed": [], "skipped": [], "failed": [], } engine_temp_dir_path: Optional[Path] = None try: # --- Setup Temporary Directory --- temp_dir_path_str = tempfile.mkdtemp(prefix=self.config_obj.temp_dir_prefix) engine_temp_dir_path = Path(temp_dir_path_str) log.debug(f"PipelineOrchestrator created temporary directory: {engine_temp_dir_path}") # --- Process Each Asset Rule --- for asset_rule in source_rule.assets: asset_name = asset_rule.asset_name log.info(f"Orchestrator: Processing asset '{asset_name}'") # --- Initialize Asset Context --- context = AssetProcessingContext( source_rule=source_rule, asset_rule=asset_rule, workspace_path=workspace_path, engine_temp_dir=engine_temp_dir_path, output_base_path=output_base_path, effective_supplier=None, asset_metadata={}, processed_maps_details={}, # Final results per item merged_maps_details={}, # Keep for potential backward compat or other uses? files_to_process=[], # Populated by FileRuleFilterStage (assumed in outer_stages) loaded_data_cache={}, config_obj=self.config_obj, status_flags={"skip_asset": False, "asset_failed": False}, incrementing_value=incrementing_value, sha5_value=sha5_value, processing_items=[], # Initialize new fields intermediate_results={} ) # --- Execute Pre-Item-Processing Outer Stages --- # (e.g., MetadataInit, SupplierDet, FileRuleFilter, GlossToRough, NormalInvert) # Identify which outer stages run before the item loop # This requires knowing the intended order. Assume all run before for now. context = self._execute_specific_stages(context, self.pre_item_stages, "pre-item", stop_on_skip=True) # Check if asset should be skipped or failed after pre-processing if context.status_flags.get("asset_failed"): log.error(f"Asset '{asset_name}': Failed during pre-processing stage '{context.status_flags.get('asset_failed_stage', 'Unknown')}'. Skipping item processing.") overall_status["failed"].append(f"{asset_name} (Failed in {context.status_flags.get('asset_failed_stage', 'Pre-Processing')})") continue # Move to the next asset rule if context.status_flags.get("skip_asset"): log.info(f"Asset '{asset_name}': Skipped during pre-processing. Skipping item processing.") overall_status["skipped"].append(asset_name) continue # Move to the next asset rule # --- Prepare Processing Items --- log.debug(f"Asset '{asset_name}': Preparing processing items...") try: log.info(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': Attempting to call _prepare_stage.execute(). Current context.status_flags: {context.status_flags}") # Prepare stage modifies context directly context = self._prepare_stage.execute(context) log.info(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': Successfully RETURNED from _prepare_stage.execute(). context.processing_items count: {len(context.processing_items) if context.processing_items is not None else 'None'}. context.status_flags: {context.status_flags}") except Exception as e: log.error(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': EXCEPTION during _prepare_stage.execute(): {e}", exc_info=True) context.status_flags["asset_failed"] = True context.status_flags["asset_failed_stage"] = "PrepareProcessingItemsStage" context.status_flags["asset_failed_reason"] = str(e) overall_status["failed"].append(f"{asset_name} (Failed in Prepare Items)") continue # Move to next asset if context.status_flags.get('prepare_items_failed'): log.error(f"Asset '{asset_name}': Failed during item preparation. Reason: {context.status_flags.get('prepare_items_failed_reason', 'Unknown')}. Skipping item processing loop.") overall_status["failed"].append(f"{asset_name} (Failed Prepare Items: {context.status_flags.get('prepare_items_failed_reason', 'Unknown')})") continue # Move to next asset if not context.processing_items: log.info(f"Asset '{asset_name}': No items to process after preparation stage.") # Status will be determined at the end # --- Core Item Processing Loop --- log.info("ORCHESTRATOR: Starting processing items loop for asset '%s'", asset_name) # Corrected indentation and message log.info(f"Asset '{asset_name}': Starting core item processing loop for {len(context.processing_items)} items...") asset_had_item_errors = False for item_index, item in enumerate(context.processing_items): item_key: Any = None # Key for storing results (FileRule object or task_key string) item_log_prefix = f"Asset '{asset_name}', Item {item_index + 1}/{len(context.processing_items)}" processed_data: Optional[Union[ProcessedRegularMapData, ProcessedMergedMapData]] = None scaled_data_output: Optional[InitialScalingOutput] = None # Store output object saved_data: Optional[SaveVariantsOutput] = None item_status = "Failed" # Default item status current_image_data: Optional[np.ndarray] = None # Track current image data ref try: # 1. Process (Load/Merge + Transform) if isinstance(item, FileRule): if item.item_type == 'EXTRA': log.debug(f"{item_log_prefix}: Skipping image processing for EXTRA FileRule '{item.file_path}'.") # Add a basic entry to processed_maps_details to acknowledge it was seen context.processed_maps_details[item.file_path] = { "status": "Skipped (EXTRA file)", "internal_map_type": "EXTRA", "source_file": str(item.file_path) } continue # Skip to the next item item_key = item.file_path # Use file_path string as key log.debug(f"{item_log_prefix}: Processing FileRule '{item.file_path}'...") processed_data = self._regular_processor_stage.execute(context, item) elif isinstance(item, MergeTaskDefinition): item_key = item.task_key # Use task_key string as key log.info(f"{item_log_prefix}: Executing MergedTaskProcessorStage for MergeTask '{item_key}'...") # Log call processed_data = self._merged_processor_stage.execute(context, item) # Log status/error from merge processor if processed_data: log.info(f"{item_log_prefix}: MergedTaskProcessorStage result - Status: {processed_data.status}, Error: {processed_data.error_message}") else: log.warning(f"{item_log_prefix}: MergedTaskProcessorStage returned None for MergeTask '{item_key}'.") else: log.warning(f"{item_log_prefix}: Unknown item type '{type(item)}'. Skipping.") item_key = f"unknown_item_{item_index}" context.processed_maps_details[item_key] = {"status": "Skipped", "notes": f"Unknown item type {type(item)}"} asset_had_item_errors = True continue # Next item # Check for processing failure if not processed_data or processed_data.status != "Processed": error_msg = processed_data.error_message if processed_data else "Processor returned None" log.error(f"{item_log_prefix}: Failed during processing stage. Error: {error_msg}") context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Processing Error: {error_msg}", "stage": processed_data.__class__.__name__ if processed_data else "UnknownProcessor"} asset_had_item_errors = True continue # Next item # Store intermediate result & get current image data context.intermediate_results[item_key] = processed_data current_image_data = processed_data.processed_image_data if isinstance(processed_data, ProcessedRegularMapData) else processed_data.merged_image_data current_dimensions = processed_data.original_dimensions if isinstance(processed_data, ProcessedRegularMapData) else processed_data.final_dimensions # 2. Scale (Optional) scaling_mode = getattr(context.config_obj, "INITIAL_SCALING_MODE", "NONE") if scaling_mode != "NONE" and current_image_data is not None and current_image_data.size > 0: if isinstance(item, MergeTaskDefinition): # Log scaling call for merge tasks log.info(f"{item_log_prefix}: Calling InitialScalingStage for MergeTask '{item_key}' (Mode: {scaling_mode})...") log.debug(f"{item_log_prefix}: Applying initial scaling (Mode: {scaling_mode})...") scale_input = InitialScalingInput( image_data=current_image_data, original_dimensions=current_dimensions, # Pass original/merged dims initial_scaling_mode=scaling_mode ) scaled_data_output = self._scaling_stage.execute(scale_input) # Update intermediate result and current image data reference context.intermediate_results[item_key] = scaled_data_output # Overwrite previous intermediate current_image_data = scaled_data_output.scaled_image_data # Use scaled data for saving log.debug(f"{item_log_prefix}: Scaling applied: {scaled_data_output.scaling_applied}. New Dims: {scaled_data_output.final_dimensions}") else: log.debug(f"{item_log_prefix}: Initial scaling skipped (Mode: NONE or empty image).") # Create dummy output if scaling skipped, using current dims final_dims = current_dimensions if current_dimensions else (current_image_data.shape[1], current_image_data.shape[0]) if current_image_data is not None else (0,0) scaled_data_output = InitialScalingOutput(scaled_image_data=current_image_data, scaling_applied=False, final_dimensions=final_dims) # 3. Save Variants if current_image_data is None or current_image_data.size == 0: log.warning(f"{item_log_prefix}: Skipping save stage because image data is empty.") context.processed_maps_details[item_key] = {"status": "Skipped", "notes": "No image data to save", "stage": "SaveVariantsStage"} # Don't mark as asset error, just skip this item's saving continue # Next item if isinstance(item, MergeTaskDefinition): # Log save call for merge tasks log.info(f"{item_log_prefix}: Calling SaveVariantsStage for MergeTask '{item_key}'...") log.debug(f"{item_log_prefix}: Saving variants...") # Prepare input for save stage internal_map_type = processed_data.final_internal_map_type if isinstance(processed_data, ProcessedRegularMapData) else processed_data.output_map_type source_bit_depth = [processed_data.original_bit_depth] if isinstance(processed_data, ProcessedRegularMapData) and processed_data.original_bit_depth is not None else processed_data.source_bit_depths if isinstance(processed_data, ProcessedMergedMapData) else [8] # Default bit depth if unknown # Construct filename tokens (ensure temp dir is used) output_filename_tokens = { 'asset_name': asset_name, 'output_base_directory': context.engine_temp_dir, # Save variants to temp dir # Add other tokens from context/config as needed by the pattern 'supplier': context.effective_supplier or 'UnknownSupplier', } # Log the value being read for the threshold before creating the input object log.info(f"ORCHESTRATOR_DEBUG: Reading RESOLUTION_THRESHOLD_FOR_JPG from config for SaveVariantsInput: {getattr(context.config_obj, 'RESOLUTION_THRESHOLD_FOR_JPG', None)}") save_input = SaveVariantsInput( image_data=current_image_data, # Use potentially scaled data internal_map_type=internal_map_type, source_bit_depth_info=source_bit_depth, output_filename_pattern_tokens=output_filename_tokens, # Pass config values needed by save stage image_resolutions=context.config_obj.image_resolutions, file_type_defs=getattr(context.config_obj, "FILE_TYPE_DEFINITIONS", {}), output_format_8bit=context.config_obj.get_8bit_output_format(), output_format_16bit_primary=context.config_obj.get_16bit_output_formats()[0], output_format_16bit_fallback=context.config_obj.get_16bit_output_formats()[1], png_compression_level=context.config_obj.png_compression_level, jpg_quality=context.config_obj.jpg_quality, output_filename_pattern=context.config_obj.output_filename_pattern, resolution_threshold_for_jpg=getattr(context.config_obj, "resolution_threshold_for_jpg", None) # Corrected case ) saved_data = self._save_stage.execute(save_input) # Log saved_data for merge tasks if isinstance(item, MergeTaskDefinition): log.info(f"{item_log_prefix}: SaveVariantsStage result for MergeTask '{item_key}' - Status: {saved_data.status if saved_data else 'N/A'}, Saved Files: {len(saved_data.saved_files_details) if saved_data else 0}") # Check save status and finalize item result if saved_data and saved_data.status.startswith("Processed"): item_status = saved_data.status # e.g., "Processed" or "Processed (No Output)" log.info(f"{item_log_prefix}: Item successfully processed and saved. Status: {item_status}") # Populate final details for this item final_details = { "status": item_status, "saved_files_info": saved_data.saved_files_details, # List of dicts from save util "internal_map_type": internal_map_type, "original_dimensions": processed_data.original_dimensions if isinstance(processed_data, ProcessedRegularMapData) else None, "final_dimensions": scaled_data_output.final_dimensions if scaled_data_output else current_dimensions, "transformations": processed_data.transformations_applied if isinstance(processed_data, ProcessedRegularMapData) else processed_data.transformations_applied_to_inputs, # Add source file if regular map "source_file": str(processed_data.source_file_path) if isinstance(processed_data, ProcessedRegularMapData) else None, } # Log final details addition for merge tasks if isinstance(item, MergeTaskDefinition): log.info(f"{item_log_prefix}: Adding final details to context.processed_maps_details for MergeTask '{item_key}'. Details: {final_details}") context.processed_maps_details[item_key] = final_details else: error_msg = saved_data.error_message if saved_data else "Save stage returned None" log.error(f"{item_log_prefix}: Failed during save stage. Error: {error_msg}") context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Save Error: {error_msg}", "stage": "SaveVariantsStage"} asset_had_item_errors = True item_status = "Failed" # Ensure item status reflects failure except Exception as e: log.exception(f"{item_log_prefix}: Unhandled exception during item processing loop: {e}") # Ensure details are recorded even on unhandled exception if item_key is not None: context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Unhandled Loop Error: {e}", "stage": "OrchestratorLoop"} else: log.error(f"Asset '{asset_name}': Unhandled exception in item loop before item key was set.") asset_had_item_errors = True item_status = "Failed" # Optionally break loop or continue? Continue for now to process other items. log.info("ORCHESTRATOR: Finished processing items loop for asset '%s'", asset_name) log.info(f"Asset '{asset_name}': Finished core item processing loop.") # --- Execute Post-Item-Processing Outer Stages --- # (e.g., OutputOrganization, MetadataFinalizationSave) # Identify which outer stages run after the item loop # This needs better handling based on stage purpose. Assume none run after for now. if not context.status_flags.get("asset_failed"): log.info("ORCHESTRATOR: Executing post-item-processing outer stages for asset '%s'", asset_name) context = self._execute_specific_stages(context, self.post_item_stages, "post-item", stop_on_skip=False) # --- Final Asset Status Determination --- final_asset_status = "Unknown" fail_reason = "" if context.status_flags.get("asset_failed"): final_asset_status = "Failed" fail_reason = f"(Failed in {context.status_flags.get('asset_failed_stage', 'Unknown Stage')}: {context.status_flags.get('asset_failed_reason', 'Unknown Reason')})" elif context.status_flags.get("skip_asset"): final_asset_status = "Skipped" fail_reason = f"(Skipped: {context.status_flags.get('skip_reason', 'Unknown Reason')})" elif asset_had_item_errors: final_asset_status = "Failed" fail_reason = "(One or more items failed)" elif not context.processing_items: # No items prepared, no errors -> consider skipped or processed based on definition? final_asset_status = "Skipped" # Or "Processed (No Items)" fail_reason = "(No items to process)" elif not context.processed_maps_details and context.processing_items: # Items were prepared, but none resulted in processed_maps_details entry final_asset_status = "Skipped" # Or Failed? fail_reason = "(All processing items skipped or failed internally)" elif context.processed_maps_details: # Check if all items in processed_maps_details are actually processed successfully all_processed_ok = all( str(details.get("status", "")).startswith("Processed") for details in context.processed_maps_details.values() ) some_processed_ok = any( str(details.get("status", "")).startswith("Processed") for details in context.processed_maps_details.values() ) if all_processed_ok: final_asset_status = "Processed" elif some_processed_ok: final_asset_status = "Partial" # Introduce a partial status? Or just Failed? fail_reason = "(Some items failed)" final_asset_status = "Failed" # Treat partial as Failed for overall status else: # No items processed successfully final_asset_status = "Failed" fail_reason = "(All items failed)" else: # Should not happen if processing_items existed final_asset_status = "Failed" fail_reason = "(Unknown state after item processing)" # Update overall status list if final_asset_status == "Processed": overall_status["processed"].append(asset_name) elif final_asset_status == "Skipped": overall_status["skipped"].append(f"{asset_name} {fail_reason}") else: # Failed or Unknown overall_status["failed"].append(f"{asset_name} {fail_reason}") log.info(f"Asset '{asset_name}' final status: {final_asset_status} {fail_reason}") # Clean up intermediate results for the asset to save memory context.intermediate_results = {} except Exception as e: log.error(f"PipelineOrchestrator.process_source_rule failed critically: {e}", exc_info=True) # Mark all assets from this source rule that weren't finished as failed processed_or_skipped_or_failed = set(overall_status["processed"]) | \ set(name.split(" ")[0] for name in overall_status["skipped"]) | \ set(name.split(" ")[0] for name in overall_status["failed"]) for asset_rule in source_rule.assets: if asset_rule.asset_name not in processed_or_skipped_or_failed: overall_status["failed"].append(f"{asset_rule.asset_name} (Orchestrator Error: {e})") finally: # --- Cleanup Temporary Directory --- if engine_temp_dir_path and engine_temp_dir_path.exists(): try: log.debug(f"PipelineOrchestrator cleaning up temporary directory: {engine_temp_dir_path}") shutil.rmtree(engine_temp_dir_path, ignore_errors=True) except Exception as e: log.error(f"Error cleaning up orchestrator temporary directory {engine_temp_dir_path}: {e}", exc_info=True) return overall_status