channelpacking now works

This commit is contained in:
2025-05-13 04:01:38 +02:00
parent 35a7221f57
commit f800bb25a9
4 changed files with 130 additions and 61 deletions

View File

@@ -165,10 +165,12 @@ class PipelineOrchestrator:
# --- Prepare Processing Items ---
log.debug(f"Asset '{asset_name}': Preparing processing items...")
try:
log.info(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': Attempting to call _prepare_stage.execute(). Current context.status_flags: {context.status_flags}")
# Prepare stage modifies context directly
context = self._prepare_stage.execute(context)
log.info(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': Successfully RETURNED from _prepare_stage.execute(). context.processing_items count: {len(context.processing_items) if context.processing_items is not None else 'None'}. context.status_flags: {context.status_flags}")
except Exception as e:
log.error(f"Asset '{asset_name}': Error during PrepareProcessingItemsStage: {e}", exc_info=True)
log.error(f"ORCHESTRATOR_TRACE: Asset '{asset_name}': EXCEPTION during _prepare_stage.execute(): {e}", exc_info=True)
context.status_flags["asset_failed"] = True
context.status_flags["asset_failed_stage"] = "PrepareProcessingItemsStage"
context.status_flags["asset_failed_reason"] = str(e)

View File

@@ -13,9 +13,6 @@ from ...utils import image_processing_utils as ipu
log = logging.getLogger(__name__)
# Helper function (Duplicated from RegularMapProcessorStage - consider moving to utils)
class MergedTaskProcessorStage(ProcessingStage):
"""
Processes a single merge task defined in the configuration.
@@ -23,6 +20,42 @@ class MergedTaskProcessorStage(ProcessingStage):
performs the merge, and returns the merged data.
"""
def _find_input_map_details_in_context(
self,
required_map_type: str,
processed_map_details_context: Dict[str, Dict[str, Any]],
log_prefix_for_find: str
) -> Optional[Dict[str, Any]]:
"""
Finds the details of a required input map from the context's processed_maps_details.
Prefers exact match for full types (e.g. MAP_TYPE-1), or base type / base type + "-1" for base types (e.g. MAP_TYPE).
Returns the details dictionary for the found map if it has saved_files_info.
"""
# Try exact match first (e.g., rule asks for "MAP_NRM-1" or "MAP_NRM" if that's how it was processed)
for item_key, details in processed_map_details_context.items():
if details.get('internal_map_type') == required_map_type:
if details.get('saved_files_info') and isinstance(details['saved_files_info'], list) and len(details['saved_files_info']) > 0:
log.debug(f"{log_prefix_for_find}: Found exact match for '{required_map_type}' with key '{item_key}'.")
return details
log.warning(f"{log_prefix_for_find}: Found exact match for '{required_map_type}' (key '{item_key}') but no saved_files_info.")
return None # Found type but no usable files
# If exact match not found, and required_map_type is a base type (e.g. "MAP_NRM")
# try to find the primary suffixed version "MAP_NRM-1" or the base type itself if it was processed without a suffix.
if not re.search(r'-\d+$', required_map_type): # if it's a base type like MAP_XXX
# Prefer "MAP_XXX-1" as the primary variant if suffixed types exist
primary_suffixed_type = f"{required_map_type}-1"
for item_key, details in processed_map_details_context.items():
if details.get('internal_map_type') == primary_suffixed_type:
if details.get('saved_files_info') and isinstance(details['saved_files_info'], list) and len(details['saved_files_info']) > 0:
log.debug(f"{log_prefix_for_find}: Found primary suffixed match '{primary_suffixed_type}' for base '{required_map_type}' with key '{item_key}'.")
return details
log.warning(f"{log_prefix_for_find}: Found primary suffixed match '{primary_suffixed_type}' (key '{item_key}') but no saved_files_info.")
return None # Found type but no usable files
log.debug(f"{log_prefix_for_find}: No suitable match found for '{required_map_type}' via exact or primary suffixed type search.")
return None
def execute(
self,
context: AssetProcessingContext,
@@ -56,17 +89,23 @@ class MergedTaskProcessorStage(ProcessingStage):
merge_dimension_mismatch_strategy = getattr(config, "MERGE_DIMENSION_MISMATCH_STRATEGY", "USE_LARGEST")
workspace_path = context.workspace_path # Base for resolving relative input paths
merge_rule_config = task_data.get('merge_rule_config', {})
input_map_sources_from_task = task_data.get('input_map_sources', {}) # Info about where inputs come from
target_dimensions_hw = task_data.get('source_dimensions') # Expected dimensions (h, w) from previous stage
merge_inputs_config = merge_rule_config.get('inputs', {}) # e.g., {'R': 'MAP_AO', 'G': 'MAP_ROUGH', ...}
merge_defaults = merge_rule_config.get('defaults', {}) # e.g., {'R': 255, 'G': 255, ...}
merge_channels_order = merge_rule_config.get('channel_order', 'RGB') # e.g., 'RGB', 'RGBA'
# input_map_sources_from_task is no longer used for paths. Paths are sourced from context.processed_maps_details.
target_dimensions_hw = task_data.get('source_dimensions') # Expected dimensions (h, w) for fallback creation, must be in config.
merge_inputs_config = task_data.get('inputs', {}) # e.g., {'R': 'MAP_AO', 'G': 'MAP_ROUGH', ...}
merge_defaults = task_data.get('defaults', {}) # e.g., {'R': 255, 'G': 255, ...}
merge_channels_order = task_data.get('channel_order', 'RGB') # e.g., 'RGB', 'RGBA'
if not merge_rule_config or not input_map_sources_from_task or not target_dimensions_hw or not merge_inputs_config:
result.error_message = "Merge task data is incomplete (missing config, sources, dimensions, or input mapping)."
# Target dimensions are crucial if fallbacks are needed.
# Merge inputs config is essential.
# Merge inputs config is essential. Check directly in task_data.
inputs_from_task_data = task_data.get('inputs')
if not isinstance(inputs_from_task_data, dict) or not inputs_from_task_data:
result.error_message = "Merge task data is incomplete (missing or invalid 'inputs' dictionary in task_data)."
log.error(f"{log_prefix}: {result.error_message}")
return result
if not target_dimensions_hw and any(merge_defaults.get(ch) is not None for ch in merge_inputs_config.keys()):
log.warning(f"{log_prefix}: Merge task has defaults defined, but 'source_dimensions' (target_dimensions_hw) is missing in task_data. Fallback image creation might fail if needed.")
# Not returning error yet, as fallbacks might not be triggered.
loaded_inputs_for_merge: Dict[str, np.ndarray] = {} # Channel char -> image data
actual_input_dimensions: List[Tuple[int, int]] = [] # List of (h, w) for loaded files
@@ -85,46 +124,61 @@ class MergedTaskProcessorStage(ProcessingStage):
log.error(f"{log_prefix}: {result.error_message}")
return result # Fail the task if an input type is invalid
input_info = input_map_sources_from_task.get(required_map_type_from_rule)
input_image_data: Optional[np.ndarray] = None
input_source_desc = f"Fallback for {required_map_type_from_rule}"
input_log_prefix = f"{log_prefix}, Input '{required_map_type_from_rule}' (Channel '{channel_char}')"
channel_transform_notes: List[str] = []
# 1. Attempt to load from file path
if input_info and input_info.get('file_path'):
# Paths in merged tasks should be relative to workspace_path
input_file_path_str = input_info['file_path']
input_file_path = workspace_path / input_file_path_str
if input_file_path.is_file():
try:
input_image_data = ipu.load_image(str(input_file_path))
if input_image_data is not None:
log.info(f"{input_log_prefix}: Loaded from: {input_file_path}")
actual_input_dimensions.append(input_image_data.shape[:2]) # (h, w)
input_source_desc = str(input_file_path)
try:
input_source_bit_depths[channel_char] = ipu.get_image_bit_depth(str(input_file_path))
except Exception:
log.warning(f"{input_log_prefix}: Could not get bit depth for {input_file_path}. Defaulting to 8.")
input_source_bit_depths[channel_char] = 8
else:
log.warning(f"{input_log_prefix}: Failed to load image from {input_file_path}. Attempting fallback.")
except Exception as e:
log.warning(f"{input_log_prefix}: Error loading image from {input_file_path}: {e}. Attempting fallback.")
# 1. Attempt to load from context.processed_maps_details
found_input_map_details = self._find_input_map_details_in_context(
required_map_type_from_rule, context.processed_maps_details, input_log_prefix
)
if found_input_map_details:
# Assuming the first saved file is the primary one for merging.
# This might need refinement if specific variants (resolutions/formats) are required.
primary_saved_file_info = found_input_map_details['saved_files_info'][0]
input_file_path_str = primary_saved_file_info.get('path')
if input_file_path_str:
input_file_path = Path(input_file_path_str) # Path is absolute from SaveVariantsStage
if input_file_path.is_file():
try:
input_image_data = ipu.load_image(str(input_file_path))
if input_image_data is not None:
log.info(f"{input_log_prefix}: Loaded from context: {input_file_path}")
actual_input_dimensions.append(input_image_data.shape[:2]) # (h, w)
input_source_desc = str(input_file_path)
# Bit depth from the saved variant info
input_source_bit_depths[channel_char] = primary_saved_file_info.get('bit_depth', 8)
else:
log.warning(f"{input_log_prefix}: Failed to load image from {input_file_path} (found in context). Attempting fallback.")
input_image_data = None # Ensure fallback is triggered
except Exception as e:
log.warning(f"{input_log_prefix}: Error loading image from {input_file_path} (found in context): {e}. Attempting fallback.")
input_image_data = None # Ensure fallback is triggered
else:
log.warning(f"{input_log_prefix}: Input file path '{input_file_path}' (from context) not found. Attempting fallback.")
input_image_data = None # Ensure fallback is triggered
else:
log.warning(f"{input_log_prefix}: Input file path not found: {input_file_path}. Attempting fallback.")
log.warning(f"{input_log_prefix}: Found map type '{required_map_type_from_rule}' in context, but 'path' is missing in saved_files_info. Attempting fallback.")
input_image_data = None # Ensure fallback is triggered
else:
log.warning(f"{input_log_prefix}: No file path provided. Attempting fallback.")
log.info(f"{input_log_prefix}: Input map type '{required_map_type_from_rule}' not found in context.processed_maps_details. Attempting fallback.")
input_image_data = None # Ensure fallback is triggered
# 2. Apply Fallback if needed
if input_image_data is None:
fallback_value = merge_defaults.get(channel_char)
if fallback_value is not None:
try:
if not target_dimensions_hw:
result.error_message = f"Cannot create fallback for channel '{channel_char}': 'source_dimensions' (target_dimensions_hw) not defined in task_data."
log.error(f"{log_prefix}: {result.error_message}")
return result # Critical failure if dimensions for fallback are missing
h, w = target_dimensions_hw
# Infer shape/dtype for fallback (simplified)
num_channels = 1 if isinstance(fallback_value, (int, float)) else len(fallback_value) if isinstance(fallback_value, (list, tuple)) else 1 # Default to 1 channel? Needs refinement.
num_channels = 1 if isinstance(fallback_value, (int, float)) else len(fallback_value) if isinstance(fallback_value, (list, tuple)) else 1
dtype = np.uint8 # Default dtype
shape = (h, w) if num_channels == 1 else (h, w, num_channels)
@@ -199,9 +253,20 @@ class MergedTaskProcessorStage(ProcessingStage):
loaded_inputs_for_merge[channel_char] = resized_img
log.debug(f"{log_prefix}: Resized input for channel '{channel_char}'.")
# If target_merge_dims_hw is still None (no source_dimensions and no mismatch), use first loaded input's dimensions
if target_merge_dims_hw is None and actual_input_dimensions:
target_merge_dims_hw = actual_input_dimensions[0]
log.info(f"{log_prefix}: Using dimensions from first loaded input: {target_merge_dims_hw}")
# --- Perform Merge ---
log.debug(f"{log_prefix}: Performing merge operation for channels '{merge_channels_order}'.")
try:
# Final check for valid dimensions before unpacking
if not isinstance(target_merge_dims_hw, tuple) or len(target_merge_dims_hw) != 2:
result.error_message = "Could not determine valid target dimensions for merge operation."
log.error(f"{log_prefix}: {result.error_message} (target_merge_dims_hw: {target_merge_dims_hw})")
return result
output_channels = len(merge_channels_order)
h, w = target_merge_dims_hw # Use the potentially adjusted dimensions

View File

@@ -60,23 +60,24 @@ class PrepareProcessingItemsStage(ProcessingStage):
# merged_image_tasks are expected to be loaded into context.config_obj
# by the Configuration class from app_settings.json.
merged_tasks_list = getattr(context.config_obj, 'merged_image_tasks', None)
merged_tasks_list = getattr(context.config_obj, 'map_merge_rules', None)
if merged_tasks_list and isinstance(merged_tasks_list, list):
log.debug(f"Asset '{asset_name_for_log}': Found {len(merged_tasks_list)} merge tasks in global config.")
for task_idx, task_data in enumerate(merged_tasks_list):
for task_idx, task_data in enumerate(merged_tasks_list):
if isinstance(task_data, dict):
task_key = f"merged_task_{task_idx}"
# Basic validation for merge task data (can be expanded)
if not task_data.get('output_map_type') or not task_data.get('merge_rule_config'):
log.warning(f"Asset '{asset_name_for_log}', Task Index {task_idx}: Skipping merge task due to missing 'output_map_type' or 'merge_rule_config'. Task data: {task_data}")
continue # Skip this specific task
merge_def = MergeTaskDefinition(task_data=task_data, task_key=task_key)
log.info(f"Asset '{asset_name_for_log}': Identified and adding Merge Task: Key='{merge_def.task_key}', OutputType='{task_data.get('output_map_type', 'N/A')}'")
items_to_process.append(merge_def)
else:
log.warning(f"Asset '{asset_name_for_log}': Item at index {task_idx} in config_obj.merged_image_tasks is not a dictionary. Skipping. Item: {task_data}")
if isinstance(task_data, dict):
task_key = f"merged_task_{task_idx}"
# Basic validation for merge task data: requires output_map_type and an inputs dictionary
if not task_data.get('output_map_type') or not isinstance(task_data.get('inputs'), dict):
log.warning(f"Asset '{asset_name_for_log}', Task Index {task_idx}: Skipping merge task due to missing 'output_map_type' or valid 'inputs' dictionary. Task data: {task_data}")
continue # Skip this specific task
log.debug(f"Asset '{asset_name_for_log}', Preparing Merge Task Index {task_idx}: Raw task_data: {task_data}")
merge_def = MergeTaskDefinition(task_data=task_data, task_key=task_key)
log.debug(f"Asset '{asset_name_for_log}': Created MergeTaskDefinition object: {merge_def}")
log.info(f"Asset '{asset_name_for_log}': Successfully CREATED MergeTaskDefinition: Key='{merge_def.task_key}', OutputType='{merge_def.task_data.get('output_map_type', 'N/A')}'")
items_to_process.append(merge_def)
else:
log.warning(f"Asset '{asset_name_for_log}': Item at index {task_idx} in config_obj.merged_image_tasks is not a dictionary. Skipping. Item: {task_data}")
# The log for "Added X potential MergeTaskDefinition items" will be covered by the final log.
elif merged_tasks_list is None:
log.debug(f"Asset '{asset_name_for_log}': 'merged_image_tasks' not found in config_obj. No global merge tasks to add.")
@@ -89,6 +90,7 @@ class PrepareProcessingItemsStage(ProcessingStage):
if not items_to_process:
log.info(f"Asset '{asset_name_for_log}': No valid items found to process after preparation.")
log.debug(f"Asset '{asset_name_for_log}': Final items_to_process before assigning to context: {items_to_process}")
context.processing_items = items_to_process
context.intermediate_results = {} # Initialize intermediate results storage