410 lines
26 KiB
Python
410 lines
26 KiB
Python
# --- Imports ---
|
|
import logging
|
|
import shutil
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional, Any, Union # Added Any, Union
|
|
|
|
import numpy as np # Added numpy
|
|
|
|
from configuration import Configuration
|
|
from rule_structure import SourceRule, AssetRule, FileRule # Added FileRule
|
|
|
|
# Import new context classes and stages
|
|
from .asset_context import (
|
|
AssetProcessingContext,
|
|
MergeTaskDefinition,
|
|
ProcessedRegularMapData,
|
|
ProcessedMergedMapData,
|
|
InitialScalingInput,
|
|
InitialScalingOutput,
|
|
SaveVariantsInput,
|
|
SaveVariantsOutput,
|
|
)
|
|
from .stages.base_stage import ProcessingStage
|
|
# Import the new stages we created
|
|
from .stages.prepare_processing_items import PrepareProcessingItemsStage
|
|
from .stages.regular_map_processor import RegularMapProcessorStage
|
|
from .stages.merged_task_processor import MergedTaskProcessorStage
|
|
from .stages.initial_scaling import InitialScalingStage
|
|
from .stages.save_variants import SaveVariantsStage
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# --- PipelineOrchestrator Class ---
|
|
|
|
class PipelineOrchestrator:
|
|
"""
|
|
Orchestrates the processing of assets based on source rules and a series of processing stages.
|
|
Manages the overall flow, including the core item processing sequence.
|
|
"""
|
|
|
|
def __init__(self, config_obj: Configuration,
|
|
pre_item_stages: List[ProcessingStage],
|
|
post_item_stages: List[ProcessingStage]):
|
|
"""
|
|
Initializes the PipelineOrchestrator.
|
|
|
|
Args:
|
|
config_obj: The main configuration object.
|
|
pre_item_stages: Stages to run before the core item processing loop.
|
|
post_item_stages: Stages to run after the core item processing loop.
|
|
"""
|
|
self.config_obj: Configuration = config_obj
|
|
self.pre_item_stages: List[ProcessingStage] = pre_item_stages
|
|
self.post_item_stages: List[ProcessingStage] = post_item_stages
|
|
# Instantiate the core item processing stages internally
|
|
self._prepare_stage = PrepareProcessingItemsStage()
|
|
self._regular_processor_stage = RegularMapProcessorStage()
|
|
self._merged_processor_stage = MergedTaskProcessorStage()
|
|
self._scaling_stage = InitialScalingStage()
|
|
self._save_stage = SaveVariantsStage()
|
|
|
|
def _execute_specific_stages(
|
|
self, context: AssetProcessingContext,
|
|
stages_to_run: List[ProcessingStage],
|
|
stage_group_name: str,
|
|
stop_on_skip: bool = True
|
|
) -> AssetProcessingContext:
|
|
"""Executes a specific list of stages."""
|
|
asset_name = context.asset_rule.asset_name if context.asset_rule else "Unknown"
|
|
log.debug(f"Asset '{asset_name}': Executing {stage_group_name} stages...")
|
|
for stage in stages_to_run:
|
|
stage_name = stage.__class__.__name__
|
|
log.debug(f"Asset '{asset_name}': Executing {stage_group_name} stage: {stage_name}")
|
|
try:
|
|
# Check if stage expects context directly or specific input
|
|
# For now, assume outer stages take context directly
|
|
# This might need refinement if outer stages also adopt Input/Output pattern
|
|
context = stage.execute(context)
|
|
except Exception as e:
|
|
log.error(f"Asset '{asset_name}': Error during outer stage '{stage_name}': {e}", exc_info=True)
|
|
context.status_flags["asset_failed"] = True
|
|
context.status_flags["asset_failed_stage"] = stage_name
|
|
context.status_flags["asset_failed_reason"] = str(e)
|
|
# Update overall metadata immediately on outer stage failure
|
|
context.asset_metadata["status"] = f"Failed: Error in stage {stage_name}"
|
|
context.asset_metadata["error_message"] = str(e)
|
|
break # Stop processing outer stages for this asset on error
|
|
|
|
if stop_on_skip and context.status_flags.get("skip_asset"):
|
|
log.info(f"Asset '{asset_name}': Skipped by outer stage '{stage_name}'. Reason: {context.status_flags.get('skip_reason', 'N/A')}")
|
|
break # Skip remaining outer stages for this asset
|
|
return context
|
|
|
|
def process_source_rule(
|
|
self,
|
|
source_rule: SourceRule,
|
|
workspace_path: Path,
|
|
output_base_path: Path,
|
|
overwrite: bool,
|
|
incrementing_value: Optional[str],
|
|
sha5_value: Optional[str] # Keep param name consistent for now
|
|
) -> Dict[str, List[str]]:
|
|
"""
|
|
Processes a single source rule, applying pre-processing stages,
|
|
the core item processing loop (Prepare, Process, Scale, Save),
|
|
and post-processing stages.
|
|
"""
|
|
overall_status: Dict[str, List[str]] = {
|
|
"processed": [],
|
|
"skipped": [],
|
|
"failed": [],
|
|
}
|
|
engine_temp_dir_path: Optional[Path] = None
|
|
|
|
try:
|
|
# --- Setup Temporary Directory ---
|
|
temp_dir_path_str = tempfile.mkdtemp(prefix=self.config_obj.temp_dir_prefix)
|
|
engine_temp_dir_path = Path(temp_dir_path_str)
|
|
log.debug(f"PipelineOrchestrator created temporary directory: {engine_temp_dir_path}")
|
|
|
|
# --- Process Each Asset Rule ---
|
|
for asset_rule in source_rule.assets:
|
|
asset_name = asset_rule.asset_name
|
|
log.info(f"Orchestrator: Processing asset '{asset_name}'")
|
|
|
|
# --- Initialize Asset Context ---
|
|
context = AssetProcessingContext(
|
|
source_rule=source_rule,
|
|
asset_rule=asset_rule,
|
|
workspace_path=workspace_path,
|
|
engine_temp_dir=engine_temp_dir_path,
|
|
output_base_path=output_base_path,
|
|
effective_supplier=None,
|
|
asset_metadata={},
|
|
processed_maps_details={}, # Final results per item
|
|
merged_maps_details={}, # Keep for potential backward compat or other uses?
|
|
files_to_process=[], # Populated by FileRuleFilterStage (assumed in outer_stages)
|
|
loaded_data_cache={},
|
|
config_obj=self.config_obj,
|
|
status_flags={"skip_asset": False, "asset_failed": False},
|
|
incrementing_value=incrementing_value,
|
|
sha5_value=sha5_value,
|
|
processing_items=[], # Initialize new fields
|
|
intermediate_results={}
|
|
)
|
|
|
|
# --- Execute Pre-Item-Processing Outer Stages ---
|
|
# (e.g., MetadataInit, SupplierDet, FileRuleFilter, GlossToRough, NormalInvert)
|
|
# Identify which outer stages run before the item loop
|
|
# This requires knowing the intended order. Assume all run before for now.
|
|
context = self._execute_specific_stages(context, self.pre_item_stages, "pre-item", stop_on_skip=True)
|
|
|
|
# Check if asset should be skipped or failed after pre-processing
|
|
if context.status_flags.get("asset_failed"):
|
|
log.error(f"Asset '{asset_name}': Failed during pre-processing stage '{context.status_flags.get('asset_failed_stage', 'Unknown')}'. Skipping item processing.")
|
|
overall_status["failed"].append(f"{asset_name} (Failed in {context.status_flags.get('asset_failed_stage', 'Pre-Processing')})")
|
|
continue # Move to the next asset rule
|
|
|
|
if context.status_flags.get("skip_asset"):
|
|
log.info(f"Asset '{asset_name}': Skipped during pre-processing. Skipping item processing.")
|
|
overall_status["skipped"].append(asset_name)
|
|
continue # Move to the next asset rule
|
|
|
|
# --- Prepare Processing Items ---
|
|
log.debug(f"Asset '{asset_name}': Preparing processing items...")
|
|
try:
|
|
# Prepare stage modifies context directly
|
|
context = self._prepare_stage.execute(context)
|
|
except Exception as e:
|
|
log.error(f"Asset '{asset_name}': Error during PrepareProcessingItemsStage: {e}", exc_info=True)
|
|
context.status_flags["asset_failed"] = True
|
|
context.status_flags["asset_failed_stage"] = "PrepareProcessingItemsStage"
|
|
context.status_flags["asset_failed_reason"] = str(e)
|
|
overall_status["failed"].append(f"{asset_name} (Failed in Prepare Items)")
|
|
continue # Move to next asset
|
|
|
|
if context.status_flags.get('prepare_items_failed'):
|
|
log.error(f"Asset '{asset_name}': Failed during item preparation. Reason: {context.status_flags.get('prepare_items_failed_reason', 'Unknown')}. Skipping item processing loop.")
|
|
overall_status["failed"].append(f"{asset_name} (Failed Prepare Items: {context.status_flags.get('prepare_items_failed_reason', 'Unknown')})")
|
|
continue # Move to next asset
|
|
|
|
if not context.processing_items:
|
|
log.info(f"Asset '{asset_name}': No items to process after preparation stage.")
|
|
# Status will be determined at the end
|
|
|
|
# --- Core Item Processing Loop ---
|
|
log.info("ORCHESTRATOR: Starting processing items loop for asset '%s'", asset_name) # Corrected indentation and message
|
|
log.info(f"Asset '{asset_name}': Starting core item processing loop for {len(context.processing_items)} items...")
|
|
asset_had_item_errors = False
|
|
for item_index, item in enumerate(context.processing_items):
|
|
item_key: Any = None # Key for storing results (FileRule object or task_key string)
|
|
item_log_prefix = f"Asset '{asset_name}', Item {item_index + 1}/{len(context.processing_items)}"
|
|
processed_data: Optional[Union[ProcessedRegularMapData, ProcessedMergedMapData]] = None
|
|
scaled_data_output: Optional[InitialScalingOutput] = None # Store output object
|
|
saved_data: Optional[SaveVariantsOutput] = None
|
|
item_status = "Failed" # Default item status
|
|
current_image_data: Optional[np.ndarray] = None # Track current image data ref
|
|
|
|
try:
|
|
# 1. Process (Load/Merge + Transform)
|
|
if isinstance(item, FileRule):
|
|
item_key = item.file_path # Use file_path string as key
|
|
log.debug(f"{item_log_prefix}: Processing FileRule '{item.file_path}'...")
|
|
processed_data = self._regular_processor_stage.execute(context, item)
|
|
elif isinstance(item, MergeTaskDefinition):
|
|
item_key = item.task_key # Use task_key string as key
|
|
log.debug(f"{item_log_prefix}: Processing MergeTask '{item_key}'...")
|
|
processed_data = self._merged_processor_stage.execute(context, item)
|
|
else:
|
|
log.warning(f"{item_log_prefix}: Unknown item type '{type(item)}'. Skipping.")
|
|
item_key = f"unknown_item_{item_index}"
|
|
context.processed_maps_details[item_key] = {"status": "Skipped", "notes": f"Unknown item type {type(item)}"}
|
|
asset_had_item_errors = True
|
|
continue # Next item
|
|
|
|
# Check for processing failure
|
|
if not processed_data or processed_data.status != "Processed":
|
|
error_msg = processed_data.error_message if processed_data else "Processor returned None"
|
|
log.error(f"{item_log_prefix}: Failed during processing stage. Error: {error_msg}")
|
|
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Processing Error: {error_msg}", "stage": processed_data.__class__.__name__ if processed_data else "UnknownProcessor"}
|
|
asset_had_item_errors = True
|
|
continue # Next item
|
|
|
|
# Store intermediate result & get current image data
|
|
context.intermediate_results[item_key] = processed_data
|
|
current_image_data = processed_data.processed_image_data if isinstance(processed_data, ProcessedRegularMapData) else processed_data.merged_image_data
|
|
current_dimensions = processed_data.original_dimensions if isinstance(processed_data, ProcessedRegularMapData) else processed_data.final_dimensions
|
|
|
|
# 2. Scale (Optional)
|
|
scaling_mode = getattr(context.config_obj, "INITIAL_SCALING_MODE", "NONE")
|
|
if scaling_mode != "NONE" and current_image_data is not None and current_image_data.size > 0:
|
|
log.debug(f"{item_log_prefix}: Applying initial scaling (Mode: {scaling_mode})...")
|
|
scale_input = InitialScalingInput(
|
|
image_data=current_image_data,
|
|
original_dimensions=current_dimensions, # Pass original/merged dims
|
|
initial_scaling_mode=scaling_mode
|
|
)
|
|
scaled_data_output = self._scaling_stage.execute(scale_input)
|
|
# Update intermediate result and current image data reference
|
|
context.intermediate_results[item_key] = scaled_data_output # Overwrite previous intermediate
|
|
current_image_data = scaled_data_output.scaled_image_data # Use scaled data for saving
|
|
log.debug(f"{item_log_prefix}: Scaling applied: {scaled_data_output.scaling_applied}. New Dims: {scaled_data_output.final_dimensions}")
|
|
else:
|
|
log.debug(f"{item_log_prefix}: Initial scaling skipped (Mode: NONE or empty image).")
|
|
# Create dummy output if scaling skipped, using current dims
|
|
final_dims = current_dimensions if current_dimensions else (current_image_data.shape[1], current_image_data.shape[0]) if current_image_data is not None else (0,0)
|
|
scaled_data_output = InitialScalingOutput(scaled_image_data=current_image_data, scaling_applied=False, final_dimensions=final_dims)
|
|
|
|
|
|
# 3. Save Variants
|
|
if current_image_data is None or current_image_data.size == 0:
|
|
log.warning(f"{item_log_prefix}: Skipping save stage because image data is empty.")
|
|
context.processed_maps_details[item_key] = {"status": "Skipped", "notes": "No image data to save", "stage": "SaveVariantsStage"}
|
|
# Don't mark as asset error, just skip this item's saving
|
|
continue # Next item
|
|
|
|
log.debug(f"{item_log_prefix}: Saving variants...")
|
|
# Prepare input for save stage
|
|
internal_map_type = processed_data.final_internal_map_type if isinstance(processed_data, ProcessedRegularMapData) else processed_data.output_map_type
|
|
source_bit_depth = [processed_data.original_bit_depth] if isinstance(processed_data, ProcessedRegularMapData) and processed_data.original_bit_depth is not None else processed_data.source_bit_depths if isinstance(processed_data, ProcessedMergedMapData) else [8] # Default bit depth if unknown
|
|
|
|
# Construct filename tokens (ensure temp dir is used)
|
|
output_filename_tokens = {
|
|
'asset_name': asset_name,
|
|
'output_base_directory': context.engine_temp_dir, # Save variants to temp dir
|
|
# Add other tokens from context/config as needed by the pattern
|
|
'supplier': context.effective_supplier or 'UnknownSupplier',
|
|
}
|
|
|
|
save_input = SaveVariantsInput(
|
|
image_data=current_image_data, # Use potentially scaled data
|
|
internal_map_type=internal_map_type,
|
|
source_bit_depth_info=source_bit_depth,
|
|
output_filename_pattern_tokens=output_filename_tokens,
|
|
# Pass config values needed by save stage
|
|
image_resolutions=context.config_obj.image_resolutions,
|
|
file_type_defs=getattr(context.config_obj, "FILE_TYPE_DEFINITIONS", {}),
|
|
output_format_8bit=context.config_obj.get_8bit_output_format(),
|
|
output_format_16bit_primary=context.config_obj.get_16bit_output_formats()[0],
|
|
output_format_16bit_fallback=context.config_obj.get_16bit_output_formats()[1],
|
|
png_compression_level=context.config_obj.png_compression_level,
|
|
jpg_quality=context.config_obj.jpg_quality,
|
|
output_filename_pattern=context.config_obj.output_filename_pattern,
|
|
)
|
|
saved_data = self._save_stage.execute(save_input)
|
|
|
|
# Check save status and finalize item result
|
|
if saved_data and saved_data.status.startswith("Processed"):
|
|
item_status = saved_data.status # e.g., "Processed" or "Processed (No Output)"
|
|
log.info(f"{item_log_prefix}: Item successfully processed and saved. Status: {item_status}")
|
|
# Populate final details for this item
|
|
final_details = {
|
|
"status": item_status,
|
|
"saved_files_info": saved_data.saved_files_details, # List of dicts from save util
|
|
"internal_map_type": internal_map_type,
|
|
"original_dimensions": processed_data.original_dimensions if isinstance(processed_data, ProcessedRegularMapData) else None,
|
|
"final_dimensions": scaled_data_output.final_dimensions if scaled_data_output else current_dimensions,
|
|
"transformations": processed_data.transformations_applied if isinstance(processed_data, ProcessedRegularMapData) else processed_data.transformations_applied_to_inputs,
|
|
# Add source file if regular map
|
|
"source_file": str(processed_data.source_file_path) if isinstance(processed_data, ProcessedRegularMapData) else None,
|
|
}
|
|
context.processed_maps_details[item_key] = final_details
|
|
else:
|
|
error_msg = saved_data.error_message if saved_data else "Save stage returned None"
|
|
log.error(f"{item_log_prefix}: Failed during save stage. Error: {error_msg}")
|
|
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Save Error: {error_msg}", "stage": "SaveVariantsStage"}
|
|
asset_had_item_errors = True
|
|
item_status = "Failed" # Ensure item status reflects failure
|
|
|
|
except Exception as e:
|
|
log.exception(f"{item_log_prefix}: Unhandled exception during item processing loop: {e}")
|
|
# Ensure details are recorded even on unhandled exception
|
|
if item_key is not None:
|
|
context.processed_maps_details[item_key] = {"status": "Failed", "notes": f"Unhandled Loop Error: {e}", "stage": "OrchestratorLoop"}
|
|
else:
|
|
log.error(f"Asset '{asset_name}': Unhandled exception in item loop before item key was set.")
|
|
asset_had_item_errors = True
|
|
item_status = "Failed"
|
|
# Optionally break loop or continue? Continue for now to process other items.
|
|
|
|
log.info("ORCHESTRATOR: Finished processing items loop for asset '%s'", asset_name)
|
|
log.info(f"Asset '{asset_name}': Finished core item processing loop.")
|
|
|
|
# --- Execute Post-Item-Processing Outer Stages ---
|
|
# (e.g., OutputOrganization, MetadataFinalizationSave)
|
|
# Identify which outer stages run after the item loop
|
|
# This needs better handling based on stage purpose. Assume none run after for now.
|
|
if not context.status_flags.get("asset_failed"):
|
|
log.info("ORCHESTRATOR: Executing post-item-processing outer stages for asset '%s'", asset_name)
|
|
context = self._execute_specific_stages(context, self.post_item_stages, "post-item", stop_on_skip=False)
|
|
|
|
# --- Final Asset Status Determination ---
|
|
final_asset_status = "Unknown"
|
|
fail_reason = ""
|
|
if context.status_flags.get("asset_failed"):
|
|
final_asset_status = "Failed"
|
|
fail_reason = f"(Failed in {context.status_flags.get('asset_failed_stage', 'Unknown Stage')}: {context.status_flags.get('asset_failed_reason', 'Unknown Reason')})"
|
|
elif context.status_flags.get("skip_asset"):
|
|
final_asset_status = "Skipped"
|
|
fail_reason = f"(Skipped: {context.status_flags.get('skip_reason', 'Unknown Reason')})"
|
|
elif asset_had_item_errors:
|
|
final_asset_status = "Failed"
|
|
fail_reason = "(One or more items failed)"
|
|
elif not context.processing_items:
|
|
# No items prepared, no errors -> consider skipped or processed based on definition?
|
|
final_asset_status = "Skipped" # Or "Processed (No Items)"
|
|
fail_reason = "(No items to process)"
|
|
elif not context.processed_maps_details and context.processing_items:
|
|
# Items were prepared, but none resulted in processed_maps_details entry
|
|
final_asset_status = "Skipped" # Or Failed?
|
|
fail_reason = "(All processing items skipped or failed internally)"
|
|
elif context.processed_maps_details:
|
|
# Check if all items in processed_maps_details are actually processed successfully
|
|
all_processed_ok = all(
|
|
str(details.get("status", "")).startswith("Processed")
|
|
for details in context.processed_maps_details.values()
|
|
)
|
|
some_processed_ok = any(
|
|
str(details.get("status", "")).startswith("Processed")
|
|
for details in context.processed_maps_details.values()
|
|
)
|
|
|
|
if all_processed_ok:
|
|
final_asset_status = "Processed"
|
|
elif some_processed_ok:
|
|
final_asset_status = "Partial" # Introduce a partial status? Or just Failed?
|
|
fail_reason = "(Some items failed)"
|
|
final_asset_status = "Failed" # Treat partial as Failed for overall status
|
|
else: # No items processed successfully
|
|
final_asset_status = "Failed"
|
|
fail_reason = "(All items failed)"
|
|
else:
|
|
# Should not happen if processing_items existed
|
|
final_asset_status = "Failed"
|
|
fail_reason = "(Unknown state after item processing)"
|
|
|
|
|
|
# Update overall status list
|
|
if final_asset_status == "Processed":
|
|
overall_status["processed"].append(asset_name)
|
|
elif final_asset_status == "Skipped":
|
|
overall_status["skipped"].append(f"{asset_name} {fail_reason}")
|
|
else: # Failed or Unknown
|
|
overall_status["failed"].append(f"{asset_name} {fail_reason}")
|
|
|
|
log.info(f"Asset '{asset_name}' final status: {final_asset_status} {fail_reason}")
|
|
# Clean up intermediate results for the asset to save memory
|
|
context.intermediate_results = {}
|
|
|
|
|
|
except Exception as e:
|
|
log.error(f"PipelineOrchestrator.process_source_rule failed critically: {e}", exc_info=True)
|
|
# Mark all assets from this source rule that weren't finished as failed
|
|
processed_or_skipped_or_failed = set(overall_status["processed"]) | \
|
|
set(name.split(" ")[0] for name in overall_status["skipped"]) | \
|
|
set(name.split(" ")[0] for name in overall_status["failed"])
|
|
for asset_rule in source_rule.assets:
|
|
if asset_rule.asset_name not in processed_or_skipped_or_failed:
|
|
overall_status["failed"].append(f"{asset_rule.asset_name} (Orchestrator Error: {e})")
|
|
finally:
|
|
# --- Cleanup Temporary Directory ---
|
|
if engine_temp_dir_path and engine_temp_dir_path.exists():
|
|
try:
|
|
log.debug(f"PipelineOrchestrator cleaning up temporary directory: {engine_temp_dir_path}")
|
|
shutil.rmtree(engine_temp_dir_path, ignore_errors=True)
|
|
except Exception as e:
|
|
log.error(f"Error cleaning up orchestrator temporary directory {engine_temp_dir_path}: {e}", exc_info=True)
|
|
|
|
return overall_status |