# processing_engine.py import os import math import shutil import tempfile import logging import json import re import time from pathlib import Path from typing import List, Dict, Tuple, Optional, Set from collections import defaultdict # Attempt to import image processing libraries try: import cv2 import numpy as np except ImportError: print("ERROR: Missing required image processing libraries. Please install opencv-python and numpy:") print("pip install opencv-python numpy") # Allow import to fail but log error; execution will likely fail later cv2 = None np = None # Attempt to import OpenEXR - Check if needed for advanced EXR flags/types try: import OpenEXR import Imath _HAS_OPENEXR = True except ImportError: _HAS_OPENEXR = False # Log this information - basic EXR might still work via OpenCV logging.debug("Optional 'OpenEXR' python package not found. EXR saving relies on OpenCV's built-in support.") # Import project-specific modules try: from configuration import Configuration, ConfigurationError from rule_structure import SourceRule, AssetRule, FileRule # Import necessary structures except ImportError: print("ERROR: Cannot import Configuration or rule_structure classes.") print("Ensure configuration.py and rule_structure.py are in the same directory or Python path.") # Allow import to fail but log error; execution will likely fail later Configuration = None SourceRule = None AssetRule = None FileRule = None # Use logger defined in main.py (or configure one here if run standalone) log = logging.getLogger(__name__) # Basic config if logger hasn't been set up elsewhere (e.g., during testing) if not log.hasHandlers(): logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') # --- Custom Exception --- class ProcessingEngineError(Exception): """Custom exception for errors during processing engine operations.""" pass # --- Helper Functions (Moved from AssetProcessor or kept static) --- def _is_power_of_two(n: int) -> bool: """Checks if a number is a power of two.""" return (n > 0) and (n & (n - 1) == 0) def get_nearest_pot(value: int) -> int: """Finds the nearest power of two to the given value.""" if value <= 0: return 1 # Or raise error, POT must be positive if _is_power_of_two(value): return value # Calculate the powers of two below and above the value lower_pot = 1 << (value.bit_length() - 1) upper_pot = 1 << value.bit_length() # Determine which power of two is closer if (value - lower_pot) < (upper_pot - value): return lower_pot else: return upper_pot def calculate_target_dimensions(orig_w, orig_h, target_max_dim) -> tuple[int, int]: """ Calculates target dimensions by first scaling to fit target_max_dim while maintaining aspect ratio, then finding the nearest power-of-two value for each resulting dimension (Stretch/Squash to POT). """ if orig_w <= 0 or orig_h <= 0: # Fallback to target_max_dim if original dimensions are invalid pot_dim = get_nearest_pot(target_max_dim) log.warning(f"Invalid original dimensions ({orig_w}x{orig_h}). Falling back to nearest POT of target_max_dim: {pot_dim}x{pot_dim}") return (pot_dim, pot_dim) # Step 1: Calculate intermediate dimensions maintaining aspect ratio ratio = orig_w / orig_h if ratio > 1: # Width is dominant scaled_w = target_max_dim scaled_h = max(1, round(scaled_w / ratio)) else: # Height is dominant or square scaled_h = target_max_dim scaled_w = max(1, round(scaled_h * ratio)) # Step 2: Find the nearest power of two for each scaled dimension pot_w = get_nearest_pot(scaled_w) pot_h = get_nearest_pot(scaled_h) log.debug(f"POT Calc: Orig=({orig_w}x{orig_h}), MaxDim={target_max_dim} -> Scaled=({scaled_w}x{scaled_h}) -> POT=({pot_w}x{pot_h})") return int(pot_w), int(pot_h) def _calculate_image_stats(image_data: np.ndarray) -> dict | None: """ Calculates min, max, mean for a given numpy image array. Handles grayscale and multi-channel images. Converts to float64 for calculation. """ if image_data is None: log.warning("Attempted to calculate stats on None image data.") return None if np is None: log.error("Numpy not available for stats calculation.") return None try: # Use float64 for calculations to avoid potential overflow/precision issues data_float = image_data.astype(np.float64) # Normalize data_float based on original dtype before calculating stats if image_data.dtype == np.uint16: log.debug("Stats calculation: Normalizing uint16 data to 0-1 range.") data_float /= 65535.0 elif image_data.dtype == np.uint8: log.debug("Stats calculation: Normalizing uint8 data to 0-1 range.") data_float /= 255.0 # Assuming float inputs are already in 0-1 range or similar log.debug(f"Stats calculation: data_float dtype: {data_float.dtype}, shape: {data_float.shape}") # Log a few sample values to check range after normalization if data_float.size > 0: sample_values = data_float.flatten()[:10] # Get first 10 values log.debug(f"Stats calculation: Sample values (first 10) after normalization: {sample_values.tolist()}") if len(data_float.shape) == 2: # Grayscale (H, W) min_val = float(np.min(data_float)) max_val = float(np.max(data_float)) mean_val = float(np.mean(data_float)) stats = {"min": min_val, "max": max_val, "mean": mean_val} log.debug(f"Calculated Grayscale Stats: Min={min_val:.4f}, Max={max_val:.4f}, Mean={mean_val:.4f}") elif len(data_float.shape) == 3: # Color (H, W, C) channels = data_float.shape[2] min_val = [float(v) for v in np.min(data_float, axis=(0, 1))] max_val = [float(v) for v in np.max(data_float, axis=(0, 1))] mean_val = [float(v) for v in np.mean(data_float, axis=(0, 1))] # Assume data is RGB order after potential conversion in _load_and_transform_source stats = {"min": min_val, "max": max_val, "mean": mean_val} log.debug(f"Calculated {channels}-Channel Stats (RGB order): Min={min_val}, Max={max_val}, Mean={mean_val}") else: log.warning(f"Cannot calculate stats for image with unsupported shape {data_float.shape}") return None return stats except Exception as e: log.error(f"Error calculating image stats: {e}", exc_info=True) # Log exception info return {"error": str(e)} def _get_base_map_type(target_map_string: str) -> str: """Extracts the base map type (e.g., 'COL') from a potentially numbered string ('COL-1').""" match = re.match(r"([a-zA-Z]+)", target_map_string) if match: return match.group(1).upper() return target_map_string.upper() # Fallback if no number suffix def _sanitize_filename(name: str) -> str: """Removes or replaces characters invalid for filenames/directory names.""" if not isinstance(name, str): name = str(name) name = re.sub(r'[^\w.\-]+', '_', name) # Allow alphanumeric, underscore, hyphen, dot name = re.sub(r'_+', '_', name) name = name.strip('_') if not name: name = "invalid_name" return name def _normalize_aspect_ratio_change(original_width, original_height, resized_width, resized_height, decimals=2): """ Calculates the aspect ratio change string (e.g., "EVEN", "X133"). Returns the string representation. """ if original_width <= 0 or original_height <= 0: log.warning("Cannot calculate aspect ratio change with zero original dimensions.") return "InvalidInput" # Avoid division by zero if resize resulted in zero dimensions (shouldn't happen with checks) if resized_width <= 0 or resized_height <= 0: log.warning("Cannot calculate aspect ratio change with zero resized dimensions.") return "InvalidResize" # Original logic from user feedback width_change_percentage = ((resized_width - original_width) / original_width) * 100 height_change_percentage = ((resized_height - original_height) / original_height) * 100 normalized_width_change = width_change_percentage / 100 normalized_height_change = height_change_percentage / 100 normalized_width_change = min(max(normalized_width_change + 1, 0), 2) normalized_height_change = min(max(normalized_height_change + 1, 0), 2) # Handle potential zero division if one dimension change is exactly -100% (normalized to 0) # If both are 0, aspect ratio is maintained. If one is 0, the other dominates. if normalized_width_change == 0 and normalized_height_change == 0: closest_value_to_one = 1.0 # Avoid division by zero, effectively scale_factor = 1 elif normalized_width_change == 0: closest_value_to_one = abs(normalized_height_change) elif normalized_height_change == 0: closest_value_to_one = abs(normalized_width_change) else: closest_value_to_one = min(abs(normalized_width_change), abs(normalized_height_change)) # Add a small epsilon to avoid division by zero if closest_value_to_one is extremely close to 0 epsilon = 1e-9 scale_factor = 1 / (closest_value_to_one + epsilon) if abs(closest_value_to_one) < epsilon else 1 / closest_value_to_one scaled_normalized_width_change = scale_factor * normalized_width_change scaled_normalized_height_change = scale_factor * normalized_height_change output_width = round(scaled_normalized_width_change, decimals) output_height = round(scaled_normalized_height_change, decimals) # Convert to int if exactly 1.0 after rounding if abs(output_width - 1.0) < epsilon: output_width = 1 if abs(output_height - 1.0) < epsilon: output_height = 1 # Determine output string if original_width == original_height or abs(output_width - output_height) < epsilon: output = "EVEN" elif output_width != 1 and output_height == 1: output = f"X{str(output_width).replace('.', '')}" elif output_height != 1 and output_width == 1: output = f"Y{str(output_height).replace('.', '')}" else: # Both changed relative to each other output = f"X{str(output_width).replace('.', '')}Y{str(output_height).replace('.', '')}" log.debug(f"Aspect ratio change calculated: Orig=({original_width}x{original_height}), Resized=({resized_width}x{resized_height}) -> String='{output}'") return output # --- Processing Engine Class --- class ProcessingEngine: """ Handles the core processing pipeline for assets based on explicit rules provided in a SourceRule object and static configuration. It does not perform classification, prediction, or rule fallback internally. """ # Define the list of known grayscale map types (adjust as needed) # This comes from static knowledge/config, not dynamic rules. GRAYSCALE_MAP_TYPES = ['HEIGHT', 'ROUGH', 'METAL', 'AO', 'OPC', 'MASK'] def __init__(self, config_obj: Configuration): """ Initializes the processing engine with static configuration. Args: config_obj: The loaded Configuration object containing static settings. """ if cv2 is None or np is None or Configuration is None or SourceRule is None: raise ProcessingEngineError("Essential libraries (OpenCV, NumPy) or classes (Configuration, SourceRule) are not available.") if not isinstance(config_obj, Configuration): raise ProcessingEngineError("config_obj must be a valid Configuration object.") self.config_obj: Configuration = config_obj self.temp_dir: Path | None = None # Path to the temporary working directory for a process run self.loaded_data_cache: dict = {} # Cache for loaded/resized data within a single process call log.debug("ProcessingEngine initialized.") def process(self, source_rule: SourceRule, workspace_path: Path, output_base_path: Path, overwrite: bool = False) -> Dict[str, List[str]]: """ Executes the processing pipeline for all assets defined in the SourceRule. Args: source_rule: The SourceRule object containing explicit instructions for all assets and files. workspace_path: The path to the directory containing the source files (e.g., extracted archive). output_base_path: The base directory where processed output will be saved. overwrite: If True, forces reprocessing even if output exists for an asset. Returns: Dict[str, List[str]]: A dictionary summarizing the status of each asset: {"processed": [asset_name1, ...], "skipped": [asset_name2, ...], "failed": [asset_name3, ...]} """ log.info(f"VERIFY: ProcessingEngine.process called with rule for input: {source_rule.input_path}") # DEBUG Verify log.debug(f" VERIFY Rule Details: {source_rule}") # DEBUG Verify (Optional detailed log) if not isinstance(source_rule, SourceRule): raise ProcessingEngineError("process() requires a valid SourceRule object.") if not isinstance(workspace_path, Path) or not workspace_path.is_dir(): raise ProcessingEngineError(f"Invalid workspace path provided: {workspace_path}") if not isinstance(output_base_path, Path): raise ProcessingEngineError(f"Invalid output base path provided: {output_base_path}") log.info(f"ProcessingEngine starting process for {len(source_rule.assets)} asset(s) defined in SourceRule.") overall_status = {"processed": [], "skipped": [], "failed": []} self.loaded_data_cache = {} # Reset cache for this run # Use a temporary directory for intermediate files (like saved maps) try: self.temp_dir = Path(tempfile.mkdtemp(prefix=self.config_obj.temp_dir_prefix)) log.debug(f"Created temporary workspace for engine: {self.temp_dir}") # --- Loop through each asset defined in the SourceRule --- for asset_rule in source_rule.assets: asset_name = asset_rule.asset_name log.info(f"--- Processing asset: '{asset_name}' ---") asset_processed = False asset_skipped = False asset_failed = False temp_metadata_path_asset = None # Track metadata file for this asset try: # --- Skip Check --- # Use static config for supplier name and metadata filename supplier_sanitized = _sanitize_filename(self.config_obj.supplier_name) asset_name_sanitized = _sanitize_filename(asset_name) final_dir = output_base_path / supplier_sanitized / asset_name_sanitized metadata_file_path = final_dir / self.config_obj.metadata_filename if not overwrite and final_dir.exists(): log.info(f"Output directory and metadata found for asset '{asset_name_sanitized}' and overwrite is False. Skipping.") overall_status["skipped"].append(asset_name) asset_skipped = True continue # Skip to the next asset elif overwrite and final_dir.exists(): log.warning(f"Output directory exists for '{asset_name_sanitized}' and overwrite is True. Removing existing directory: {final_dir}") try: shutil.rmtree(final_dir) except Exception as rm_err: raise ProcessingEngineError(f"Failed to remove existing output directory {final_dir} during overwrite: {rm_err}") from rm_err # --- Prepare Asset Metadata --- # Start with common metadata from the rule, add asset name current_asset_metadata = asset_rule.common_metadata.copy() current_asset_metadata["asset_name"] = asset_name # Add other fields that will be populated current_asset_metadata["maps_present"] = [] current_asset_metadata["merged_maps"] = [] current_asset_metadata["shader_features"] = [] current_asset_metadata["source_files_in_extra"] = [] current_asset_metadata["image_stats_1k"] = {} current_asset_metadata["map_details"] = {} current_asset_metadata["aspect_ratio_change_string"] = "N/A" current_asset_metadata["merged_map_channel_stats"] = {} # Initialize for stats # --- Process Individual Maps --- processed_maps_details_asset, image_stats_asset, aspect_ratio_change_string_asset = self._process_individual_maps( asset_rule=asset_rule, workspace_path=workspace_path, # Use the workspace path received by process() (contains prepared files) current_asset_metadata=current_asset_metadata # Pass mutable dict ) # Update metadata with results (stats and aspect ratio are updated directly in current_asset_metadata by the method) # map_details are also updated directly in current_asset_metadata # --- Merge Maps --- merged_maps_details_asset = self._merge_maps( asset_rule=asset_rule, workspace_path=workspace_path, processed_maps_details_asset=processed_maps_details_asset, # Needed to find resolutions current_asset_metadata=current_asset_metadata # Pass mutable dict for stats ) # --- Generate Metadata --- temp_metadata_path_asset = self._generate_metadata_file( source_rule=source_rule, # Pass the parent SourceRule asset_rule=asset_rule, current_asset_metadata=current_asset_metadata, # Pass the populated dict processed_maps_details_asset=processed_maps_details_asset, merged_maps_details_asset=merged_maps_details_asset ) # --- Organize Output --- self._organize_output_files( asset_rule=asset_rule, workspace_path=workspace_path, # Pass the original workspace path supplier_identifier=source_rule.supplier_identifier, # Pass supplier from SourceRule output_base_path=output_base_path, # Pass output path processed_maps_details_asset=processed_maps_details_asset, merged_maps_details_asset=merged_maps_details_asset, temp_metadata_path=temp_metadata_path_asset ) log.info(f"--- Asset '{asset_name}' processed successfully. ---") overall_status["processed"].append(asset_name) asset_processed = True except Exception as asset_err: log.error(f"--- Failed processing asset '{asset_name}': {asset_err} ---", exc_info=True) overall_status["failed"].append(asset_name) asset_failed = True # Continue to the next asset log.info(f"ProcessingEngine finished. Summary: {overall_status}") return overall_status except Exception as e: log.exception(f"Processing engine failed unexpectedly: {e}") # Ensure all assets not processed/skipped are marked as failed processed_or_skipped = set(overall_status["processed"] + overall_status["skipped"]) for asset_rule in source_rule.assets: if asset_rule.asset_name not in processed_or_skipped: overall_status["failed"].append(asset_rule.asset_name) return overall_status # Return partial status if possible finally: self._cleanup_workspace() def _setup_workspace(self): """Creates a temporary directory for processing.""" # This is now handled within the process method to ensure it's created per run. # Kept as a placeholder if needed later, but currently unused. pass def _cleanup_workspace(self): """Removes the temporary workspace directory if it exists.""" if self.temp_dir and self.temp_dir.exists(): try: log.debug(f"Cleaning up engine temporary workspace: {self.temp_dir}") # Ignore errors during cleanup (e.g., permission errors on copied .git files) shutil.rmtree(self.temp_dir, ignore_errors=True) self.temp_dir = None log.debug("Engine temporary workspace cleaned up successfully.") except Exception as e: log.error(f"Failed to remove engine temporary workspace {self.temp_dir}: {e}", exc_info=True) self.loaded_data_cache = {} # Clear cache after cleanup def _load_and_transform_source(self, source_path_abs: Path, map_type: str, target_resolution_key: str, is_gloss_source: bool) -> Tuple[Optional[np.ndarray], Optional[np.dtype]]: """ Loads a source image file, performs initial prep (BGR->RGB, Gloss->Rough), resizes it to the target resolution, and caches the result. Uses static configuration from self.config_obj. Args: source_path_abs: Absolute path to the source file in the workspace. map_type: The standard map type (e.g., "NRM", "ROUGH", "ROUGH-1"). target_resolution_key: The key for the target resolution (e.g., "4K"). is_gloss_source: Boolean indicating if this source should be treated as gloss for inversion. Returns: Tuple containing: - Resized NumPy array (float32 for gloss-inverted, original type otherwise) or None if loading/processing fails. - Original source NumPy dtype or None if loading fails. """ if cv2 is None or np is None: log.error("OpenCV or NumPy not available for image loading.") return None, None cache_key = (source_path_abs, target_resolution_key) # Use absolute path for cache key if cache_key in self.loaded_data_cache: log.debug(f"CACHE HIT: Returning cached data for {source_path_abs.name} at {target_resolution_key}") return self.loaded_data_cache[cache_key] # Return tuple (image_data, source_dtype) log.debug(f"CACHE MISS: Loading and transforming {source_path_abs.name} for {target_resolution_key}") img_prepared = None source_dtype = None try: # --- 1. Load Source Image --- # Determine read flag (Grayscale for specific types, unchanged otherwise) # Use static GRAYSCALE_MAP_TYPES list base_map_type = _get_base_map_type(map_type) # Get base type (e.g., ROUGH from ROUGH-1) read_flag = cv2.IMREAD_GRAYSCALE if base_map_type in self.GRAYSCALE_MAP_TYPES else cv2.IMREAD_UNCHANGED # Special case for MASK: always load unchanged first to check alpha if base_map_type == 'MASK': read_flag = cv2.IMREAD_UNCHANGED log.debug(f"Loading source {source_path_abs.name} with flag: {'GRAYSCALE' if read_flag == cv2.IMREAD_GRAYSCALE else 'UNCHANGED'}") img_loaded = cv2.imread(str(source_path_abs), read_flag) if img_loaded is None: raise ProcessingEngineError(f"Failed to load image file: {source_path_abs.name} with flag {read_flag}") source_dtype = img_loaded.dtype log.debug(f"Loaded source {source_path_abs.name}, dtype: {source_dtype}, shape: {img_loaded.shape}") # --- 2. Initial Preparation (BGR->RGB, Gloss Inversion, MASK handling) --- img_prepared = img_loaded # Start with loaded image # MASK Handling (Extract alpha or convert) - Do this BEFORE general color conversions if base_map_type == 'MASK': log.debug(f"Processing as MASK type for {source_path_abs.name}.") shape = img_prepared.shape if len(shape) == 3 and shape[2] == 4: log.debug("MASK processing: Extracting alpha channel (4-channel source).") img_prepared = img_prepared[:, :, 3] # Extract alpha elif len(shape) == 3 and shape[2] == 3: log.debug("MASK processing: Converting BGR to Grayscale (3-channel source).") # OpenCV loads as BGR img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_BGR2GRAY) # Convert BGR to Gray elif len(shape) == 2: log.debug("MASK processing: Source is already grayscale.") # img_prepared remains img_prepared else: log.warning(f"MASK processing: Unexpected source shape {shape}. Cannot reliably extract mask.") img_prepared = None # Cannot process else: # BGR -> RGB conversion (only for 3/4-channel images not loaded as grayscale) if len(img_prepared.shape) == 3 and img_prepared.shape[2] >= 3 and read_flag != cv2.IMREAD_GRAYSCALE: log.debug(f"Converting loaded image from BGR to RGB for {source_path_abs.name}.") if img_prepared.shape[2] == 4: # BGRA -> RGB img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_BGRA2RGB) else: # BGR -> RGB img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_BGR2RGB) elif len(img_prepared.shape) == 2: log.debug(f"Image {source_path_abs.name} is grayscale, no BGR->RGB conversion needed.") # else: log warning handled below if img_prepared is None: raise ProcessingEngineError("Image data is None after MASK/Color prep.") # Gloss -> Roughness Inversion (only if map_type starts with ROUGH and is_gloss_source is True) log.debug(f"Gloss Inversion Check: map_type='{map_type}', is_gloss_source={is_gloss_source}") # DEBUG ADDED condition_met = map_type.startswith('ROUGH') and is_gloss_source # DEBUG ADDED log.debug(f"Gloss Inversion Check: Condition met = {condition_met}") # DEBUG ADDED if condition_met: log.info(f"Performing Gloss->Roughness inversion for {source_path_abs.name}") # Ensure grayscale before inversion if len(img_prepared.shape) == 3: log.debug("Gloss Inversion: Converting 3-channel image to grayscale before inversion.") # DEBUG ADDED img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_RGB2GRAY) # Use RGB2GRAY as it should be RGB now # Log stats *before* inversion (after potential grayscale conversion) stats_before = _calculate_image_stats(img_prepared) # DEBUG ADDED log.debug(f"Gloss Inversion: Image stats BEFORE inversion: {stats_before}") # DEBUG ADDED # Normalize based on original source dtype before inversion if source_dtype == np.uint16: log.debug("Gloss Inversion: Normalizing uint16 data for inversion.") # DEBUG ADDED img_float = 1.0 - (img_prepared.astype(np.float32) / 65535.0) elif source_dtype == np.uint8: log.debug("Gloss Inversion: Normalizing uint8 data for inversion.") # DEBUG ADDED img_float = 1.0 - (img_prepared.astype(np.float32) / 255.0) else: # Assuming float input is already 0-1 range log.debug("Gloss Inversion: Assuming float data is already normalized for inversion.") # DEBUG ADDED img_float = 1.0 - img_prepared.astype(np.float32) img_prepared = np.clip(img_float, 0.0, 1.0) # Result is float32 # Log stats *after* inversion stats_after = _calculate_image_stats(img_prepared) # DEBUG ADDED log.debug(f"Gloss Inversion: Image stats AFTER inversion (float32): {stats_after}") # DEBUG ADDED log.debug(f"Inverted gloss map stored as float32 for ROUGH, original dtype: {source_dtype}") # Ensure data is float32/uint8/uint16 for resizing compatibility if isinstance(img_prepared, np.ndarray) and img_prepared.dtype not in [np.uint8, np.uint16, np.float32, np.float16]: log.warning(f"Converting unexpected dtype {img_prepared.dtype} to float32 before resizing.") img_prepared = img_prepared.astype(np.float32) # --- 3. Resize --- if img_prepared is None: raise ProcessingEngineError("Image data is None after initial prep.") orig_h, orig_w = img_prepared.shape[:2] # Get resolutions from static config target_dim_px = self.config_obj.image_resolutions.get(target_resolution_key) if not target_dim_px: raise ProcessingEngineError(f"Target resolution key '{target_resolution_key}' not found in config.") # Avoid upscaling check (using static config) max_original_dimension = max(orig_w, orig_h) # TODO: Add config option for allowing upscale? For now, skip if target > original. if target_dim_px > max_original_dimension: log.warning(f"Target dimension {target_dim_px}px is larger than original {max_original_dimension}px for {source_path_abs.name}. Skipping resize for {target_resolution_key}.") # Store None in cache for this specific resolution to avoid retrying self.loaded_data_cache[cache_key] = (None, source_dtype) return None, source_dtype # Indicate resize was skipped if orig_w <= 0 or orig_h <= 0: raise ProcessingEngineError(f"Invalid original dimensions ({orig_w}x{orig_h}) for {source_path_abs.name}.") target_w, target_h = calculate_target_dimensions(orig_w, orig_h, target_dim_px) interpolation = cv2.INTER_LANCZOS4 if (target_w * target_h) < (orig_w * orig_h) else cv2.INTER_CUBIC log.debug(f"Resizing {source_path_abs.name} from ({orig_w}x{orig_h}) to ({target_w}x{target_h}) for {target_resolution_key}") img_resized = cv2.resize(img_prepared, (target_w, target_h), interpolation=interpolation) # --- 4. Cache and Return --- # Keep resized dtype unless it was gloss-inverted (which is float32) final_data_to_cache = img_resized if map_type.startswith('ROUGH') and is_gloss_source and final_data_to_cache.dtype != np.float32: final_data_to_cache = final_data_to_cache.astype(np.float32) log.debug(f"CACHING result for {cache_key}. Shape: {final_data_to_cache.shape}, Dtype: {final_data_to_cache.dtype}") self.loaded_data_cache[cache_key] = (final_data_to_cache, source_dtype) return final_data_to_cache, source_dtype except Exception as e: log.error(f"Error in _load_and_transform_source for {source_path_abs.name} at {target_resolution_key}: {e}", exc_info=True) # Cache None to prevent retrying on error for this specific key self.loaded_data_cache[cache_key] = (None, None) return None, None def _save_image(self, image_data: np.ndarray, map_type: str, resolution_key: str, asset_base_name: str, source_info: dict, output_bit_depth_rule: str) -> Optional[Dict]: """ Handles saving an image NumPy array to a temporary file within the engine's temp_dir. Uses static configuration from self.config_obj for formats, quality, etc. Args: image_data: NumPy array containing the image data to save. map_type: The standard map type being saved (e.g., "COL", "NRMRGH"). resolution_key: The resolution key (e.g., "4K"). asset_base_name: The sanitized base name of the asset. source_info: Dictionary containing details about the source(s), e.g., {'original_extension': '.tif', 'source_bit_depth': 16, 'involved_extensions': {'.tif', '.png'}, 'max_input_bit_depth': 16} output_bit_depth_rule: Rule for determining output bit depth ('respect', 'force_8bit', 'force_16bit', 'respect_inputs'). Returns: A dictionary containing details of the saved file (path relative to engine's temp_dir, width, height, bit_depth, format) or None if saving failed. """ if cv2 is None or np is None: log.error("OpenCV or NumPy not available for image saving.") return None if image_data is None: log.error(f"Cannot save image for {map_type} ({resolution_key}): image_data is None.") return None if not self.temp_dir or not self.temp_dir.exists(): log.error(f"Cannot save image for {map_type} ({resolution_key}): Engine temp_dir is invalid.") return None try: h, w = image_data.shape[:2] current_dtype = image_data.dtype log.debug(f"Saving {map_type} ({resolution_key}) for asset '{asset_base_name}'. Input shape: {image_data.shape}, dtype: {current_dtype}") # --- Get Static Config Values --- config = self.config_obj # Alias for brevity primary_fmt_16, fallback_fmt_16 = config.get_16bit_output_formats() fmt_8bit_config = config.get_8bit_output_format() threshold = config.resolution_threshold_for_jpg force_lossless_map_types = config.force_lossless_map_types jpg_quality = config.jpg_quality png_compression_level = config._core_settings.get('PNG_COMPRESSION_LEVEL', 6) target_filename_pattern = config.target_filename_pattern image_resolutions = config.image_resolutions # --- 1. Determine Output Bit Depth --- source_bpc = source_info.get('source_bit_depth', 8) # Default to 8 if missing max_input_bpc = source_info.get('max_input_bit_depth', source_bpc) # For 'respect_inputs' merge rule output_dtype_target, output_bit_depth = np.uint8, 8 # Default if output_bit_depth_rule == 'force_8bit': output_dtype_target, output_bit_depth = np.uint8, 8 elif output_bit_depth_rule == 'force_16bit': output_dtype_target, output_bit_depth = np.uint16, 16 elif output_bit_depth_rule == 'respect': # For individual maps if source_bpc == 16: output_dtype_target, output_bit_depth = np.uint16, 16 # Handle float source? Assume 16-bit output if source was float? Needs clarification. # For now, stick to uint8/16 based on source_bpc. elif output_bit_depth_rule == 'respect_inputs': # For merged maps if max_input_bpc == 16: output_dtype_target, output_bit_depth = np.uint16, 16 else: # Default to 8-bit if rule is unknown log.warning(f"Unknown output_bit_depth_rule '{output_bit_depth_rule}'. Defaulting to 8-bit.") output_dtype_target, output_bit_depth = np.uint8, 8 log.debug(f"Target output bit depth: {output_bit_depth}-bit (dtype: {output_dtype_target.__name__}) based on rule '{output_bit_depth_rule}'") # --- 2. Determine Output Format --- output_format, output_ext, save_params, needs_float16 = "", "", [], False base_map_type = _get_base_map_type(map_type) # Use base type for lossless check force_lossless = base_map_type in force_lossless_map_types original_extension = source_info.get('original_extension', '.png') # Primary source ext involved_extensions = source_info.get('involved_extensions', {original_extension}) # For merges target_dim_px = image_resolutions.get(resolution_key, 0) # Get target dimension size # Apply format determination logic (using static config) if force_lossless: log.debug(f"Format forced to lossless for map type '{base_map_type}'.") if output_bit_depth == 16: output_format = primary_fmt_16 if output_format.startswith("exr"): output_ext, needs_float16 = ".exr", True save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF]) else: # Assume PNG if primary 16-bit isn't EXR if output_format != "png": log.warning(f"Primary 16-bit format '{output_format}' not PNG/EXR for forced lossless. Using fallback '{fallback_fmt_16}'.") output_format = fallback_fmt_16 if fallback_fmt_16 == "png" else "png" # Ensure PNG output_ext = ".png" save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) else: # 8-bit lossless -> PNG output_format = "png"; output_ext = ".png" save_params = [cv2.IMWRITE_PNG_COMPRESSION, png_compression_level] elif output_bit_depth == 8 and target_dim_px >= threshold: output_format = 'jpg'; output_ext = '.jpg' save_params.extend([cv2.IMWRITE_JPEG_QUALITY, jpg_quality]) log.debug(f"Using JPG format (Quality: {jpg_quality}) for {map_type} at {resolution_key} due to resolution threshold ({target_dim_px} >= {threshold}).") else: # Determine highest format involved (for merges) or use original (for individuals) highest_format_str = 'jpg' # Default lowest relevant_extensions = involved_extensions # Use involved_extensions directly if '.exr' in relevant_extensions: highest_format_str = 'exr' elif '.tif' in relevant_extensions: highest_format_str = 'tif' elif '.png' in relevant_extensions: highest_format_str = 'png' if highest_format_str == 'exr': if output_bit_depth == 16: output_format, output_ext, needs_float16 = "exr", ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF]) else: output_format, output_ext = "png", ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) elif highest_format_str == 'tif': if output_bit_depth == 16: output_format = primary_fmt_16 if output_format.startswith("exr"): output_ext, needs_float16 = ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF]) else: output_format = "png"; output_ext = ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) else: output_format, output_ext = "png", ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) elif highest_format_str == 'png': if output_bit_depth == 16: output_format = primary_fmt_16 if output_format.startswith("exr"): output_ext, needs_float16 = ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF]) else: output_format = "png"; output_ext = ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) else: output_format, output_ext = "png", ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) else: # Default to configured 8-bit format if highest was JPG or unknown output_format = fmt_8bit_config; output_ext = f".{output_format}" if output_format == "png": save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) elif output_format == "jpg": save_params.extend([cv2.IMWRITE_JPEG_QUALITY, jpg_quality]) # Final check: JPG must be 8-bit if output_format == "jpg" and output_bit_depth == 16: log.warning(f"Output format is JPG, but target bit depth is 16. Forcing 8-bit for {map_type} ({resolution_key}).") output_dtype_target, output_bit_depth = np.uint8, 8 log.debug(f"Determined save format: {output_format}, ext: {output_ext}, bit_depth: {output_bit_depth}, needs_float16: {needs_float16}") # --- 3. Final Data Type Conversion --- img_to_save = image_data.copy() # Work on a copy if output_dtype_target == np.uint8 and img_to_save.dtype != np.uint8: log.debug(f"Converting image data from {img_to_save.dtype} to uint8 for saving.") if img_to_save.dtype == np.uint16: img_to_save = (img_to_save.astype(np.float32) / 65535.0 * 255.0).astype(np.uint8) elif img_to_save.dtype in [np.float16, np.float32]: img_to_save = (np.clip(img_to_save, 0.0, 1.0) * 255.0).astype(np.uint8) else: img_to_save = img_to_save.astype(np.uint8) # Direct cast for other types (e.g., bool) elif output_dtype_target == np.uint16 and img_to_save.dtype != np.uint16: log.debug(f"Converting image data from {img_to_save.dtype} to uint16 for saving.") if img_to_save.dtype == np.uint8: img_to_save = img_to_save.astype(np.uint16) * 257 # Proper 8->16 bit scaling elif img_to_save.dtype in [np.float16, np.float32]: img_to_save = (np.clip(img_to_save, 0.0, 1.0) * 65535.0).astype(np.uint16) else: img_to_save = img_to_save.astype(np.uint16) if needs_float16 and img_to_save.dtype != np.float16: log.debug(f"Converting image data from {img_to_save.dtype} to float16 for EXR saving.") if img_to_save.dtype == np.uint16: img_to_save = (img_to_save.astype(np.float32) / 65535.0).astype(np.float16) elif img_to_save.dtype == np.uint8: img_to_save = (img_to_save.astype(np.float32) / 255.0).astype(np.float16) elif img_to_save.dtype == np.float32: img_to_save = img_to_save.astype(np.float16) else: log.warning(f"Cannot convert {img_to_save.dtype} to float16 for EXR save."); return None # --- 4. Final Color Space Conversion (RGB -> BGR for non-EXR) --- img_save_final = img_to_save is_3_channel = len(img_to_save.shape) == 3 and img_to_save.shape[2] == 3 if is_3_channel and not output_format.startswith("exr"): log.debug(f"Converting RGB to BGR for saving {map_type} ({resolution_key}) as {output_format}") try: img_save_final = cv2.cvtColor(img_to_save, cv2.COLOR_RGB2BGR) except Exception as cvt_err: log.error(f"Failed RGB->BGR conversion before save for {map_type} ({resolution_key}): {cvt_err}. Saving original RGB.") img_save_final = img_to_save # Fallback # --- 5. Construct Filename & Save --- filename = target_filename_pattern.format( base_name=asset_base_name, map_type=map_type, resolution=resolution_key, ext=output_ext.lstrip('.') ) output_path_temp = self.temp_dir / filename # Save to engine's temp dir log.debug(f"Attempting to save: {output_path_temp.name} (Format: {output_format}, Dtype: {img_save_final.dtype})") saved_successfully = False actual_format_saved = output_format try: cv2.imwrite(str(output_path_temp), img_save_final, save_params) saved_successfully = True log.info(f" > Saved {map_type} ({resolution_key}, {output_bit_depth}-bit) as {output_format}") except Exception as save_err: log.error(f"Save failed ({output_format}) for {map_type} {resolution_key}: {save_err}") # --- Try Fallback --- if output_bit_depth == 16 and output_format.startswith("exr") and fallback_fmt_16 != output_format and fallback_fmt_16 == "png": log.warning(f"Attempting fallback PNG save for {map_type} {resolution_key}") actual_format_saved = "png"; output_ext = ".png"; filename = target_filename_pattern.format(base_name=asset_base_name, map_type=map_type, resolution=resolution_key, ext="png") output_path_temp = self.temp_dir / filename save_params_fallback = [cv2.IMWRITE_PNG_COMPRESSION, png_compression_level] img_fallback = None; target_fallback_dtype = np.uint16 # Convert original data (before float16 conversion) to uint16 for PNG fallback if img_to_save.dtype == np.float16: # This means original was likely float or uint16/8 converted to float16 # Safest is to convert the float16 back to uint16 img_scaled = np.clip(img_to_save.astype(np.float32) * 65535.0, 0, 65535) img_fallback = img_scaled.astype(target_fallback_dtype) elif img_to_save.dtype == target_fallback_dtype: img_fallback = img_to_save # Already uint16 else: log.error(f"Cannot convert {img_to_save.dtype} for PNG fallback."); return None # --- Conditional RGB -> BGR Conversion for fallback --- img_fallback_save_final = img_fallback is_3_channel_fallback = len(img_fallback.shape) == 3 and img_fallback.shape[2] == 3 if is_3_channel_fallback: # PNG is non-EXR log.debug(f"Converting RGB to BGR for fallback PNG save {map_type} ({resolution_key})") try: img_fallback_save_final = cv2.cvtColor(img_fallback, cv2.COLOR_RGB2BGR) except Exception as cvt_err_fb: log.error(f"Failed RGB->BGR conversion for fallback PNG: {cvt_err_fb}. Saving original.") try: cv2.imwrite(str(output_path_temp), img_fallback_save_final, save_params_fallback) saved_successfully = True log.info(f" > Saved {map_type} ({resolution_key}) using fallback PNG") except Exception as fallback_err: log.error(f"Fallback PNG save failed for {map_type} {resolution_key}: {fallback_err}", exc_info=True) else: log.error(f"No suitable fallback available or applicable for failed save of {map_type} ({resolution_key}) as {output_format}.") # --- 6. Return Result --- if saved_successfully: return { "path": output_path_temp.relative_to(self.temp_dir), # Store relative path within engine's temp "resolution": resolution_key, "width": w, "height": h, "bit_depth": output_bit_depth, "format": actual_format_saved } else: return None # Indicate save failure except Exception as e: log.error(f"Unexpected error in _save_image for {map_type} ({resolution_key}): {e}", exc_info=True) return None def _process_individual_maps(self, asset_rule: AssetRule, workspace_path: Path, current_asset_metadata: Dict) -> Tuple[Dict[str, Dict[str, Dict]], Dict[str, Dict], str]: """ Processes, resizes, and saves individual map files for a specific asset based on the provided AssetRule and static configuration. Args: asset_rule: The AssetRule object containing file rules for this asset. workspace_path: Path to the directory containing the source files. current_asset_metadata: Mutable metadata dictionary for the current asset (updated directly). Returns: Tuple containing: - processed_maps_details_asset: Dict mapping map_type to resolution details. - image_stats_asset: Dict mapping map_type to calculated image statistics (also added to current_asset_metadata). - aspect_ratio_change_string_asset: String indicating aspect ratio change (also added to current_asset_metadata). """ if not self.temp_dir: raise ProcessingEngineError("Engine workspace (temp_dir) not setup.") asset_name = asset_rule.asset_name log.info(f"Processing individual map files for asset '{asset_name}'...") # Initialize results specific to this asset processed_maps_details_asset: Dict[str, Dict[str, Dict]] = defaultdict(dict) image_stats_asset: Dict[str, Dict] = {} # Local dict for stats map_details_asset: Dict[str, Dict] = {} # Store details like source bit depth, gloss inversion aspect_ratio_change_string_asset: str = "N/A" # --- Settings retrieval from static config --- resolutions = self.config_obj.image_resolutions stats_res_key = self.config_obj.calculate_stats_resolution stats_target_dim = resolutions.get(stats_res_key) if not stats_target_dim: log.warning(f"Stats resolution key '{stats_res_key}' not found in config. Stats skipped for '{asset_name}'.") base_name = asset_name # Use the asset name from the rule # --- Aspect Ratio Calculation Setup --- first_map_rule_for_aspect = next((fr for fr in asset_rule.files if fr.item_type_override is not None), None) orig_w_aspect, orig_h_aspect = None, None if first_map_rule_for_aspect: first_res_key = next(iter(resolutions)) # Use first resolution key source_path_abs = workspace_path / first_map_rule_for_aspect.file_path temp_img_for_dims, _ = self._load_and_transform_source( source_path_abs, first_map_rule_for_aspect.item_type_override, first_res_key, is_gloss_source=False, # Added: Not relevant for dimension check, but required by method # self.loaded_data_cache is used internally by the method ) if temp_img_for_dims is not None: orig_h_aspect, orig_w_aspect = temp_img_for_dims.shape[:2] log.debug(f"Got original dimensions ({orig_w_aspect}x{orig_h_aspect}) for aspect ratio calculation from {first_map_rule_for_aspect.file_path}") else: log.warning(f"Could not load image {first_map_rule_for_aspect.file_path} to get original dimensions for aspect ratio.") else: log.warning("No map files found in AssetRule, cannot calculate aspect ratio string.") # --- Process Each Individual Map defined in the AssetRule --- for file_rule in asset_rule.files: # --- Check if this file should be processed individually --- # Skip if no item type is assigned, if it's explicitly "EXTRA", or if marked to skip should_skip = ( file_rule.item_type_override is None or file_rule.item_type_override == "EXTRA" or # Explicitly skip "EXTRA" type getattr(file_rule, 'skip_processing', False) ) if should_skip: log.debug(f"Skipping individual processing for {file_rule.file_path} (ItemTypeOverride: {file_rule.item_type_override}, SkipProcessing: {getattr(file_rule, 'skip_processing', False)})") continue # Skip to the next file_rule # --- Proceed with processing for this file_rule --- source_path_rel = Path(file_rule.file_path) # Ensure it's a Path object # IMPORTANT: Use the ENGINE's workspace_path (self.temp_dir) for loading, # as individual maps should have been copied there by the caller (ProcessingTask) # Correction: _process_individual_maps receives the *engine's* temp_dir as workspace_path source_path_abs = workspace_path / source_path_rel map_type = file_rule.item_type_override # Use the explicit map type from the rule # Determine if the source is gloss based on the flag set during prediction # is_gloss_source = map_type in gloss_identifiers # <<< INCORRECT: Re-calculates based on target type is_gloss_source = getattr(file_rule, 'is_gloss_source', False) # <<< CORRECT: Use flag from FileRule object log.debug(f"Using is_gloss_source={is_gloss_source} directly from FileRule for {file_rule.file_path}") # DEBUG ADDED original_extension = source_path_rel.suffix.lower() # Get from path log.info(f"-- Asset '{asset_name}': Processing Individual Map: {map_type} (Source: {source_path_rel.name}, IsGlossSource: {is_gloss_source}) --") # DEBUG: Added flag to log current_map_details = {"derived_from_gloss": is_gloss_source} source_bit_depth_found = None # Track if we've found the bit depth for this map type try: # --- Loop through target resolutions from static config --- for res_key, target_dim_px in resolutions.items(): log.debug(f"Processing {map_type} for resolution: {res_key}...") # --- 1. Load and Transform Source (using helper + cache) --- # This now only runs for files that have an item_type_override img_resized, source_dtype = self._load_and_transform_source( source_path_abs=source_path_abs, map_type=map_type, # Pass the specific map type (e.g., ROUGH-1) target_resolution_key=res_key, is_gloss_source=is_gloss_source # self.loaded_data_cache is used internally ) if img_resized is None: # This warning now correctly indicates a failure for a map we *intended* to process log.warning(f"Failed to load/transform source map {source_path_rel} for {res_key}. Skipping resolution.") continue # Skip this resolution # Store source bit depth once found if source_dtype is not None and source_bit_depth_found is None: source_bit_depth_found = 16 if source_dtype == np.uint16 else (8 if source_dtype == np.uint8 else 8) # Default non-uint to 8 current_map_details["source_bit_depth"] = source_bit_depth_found log.debug(f"Stored source bit depth for {map_type}: {source_bit_depth_found}") # --- 2. Calculate Stats (if applicable) --- if res_key == stats_res_key and stats_target_dim: log.debug(f"Calculating stats for {map_type} using {res_key} image...") stats = _calculate_image_stats(img_resized) if stats: image_stats_asset[map_type] = stats # Store locally first else: log.warning(f"Stats calculation failed for {map_type} at {res_key}.") # --- 3. Calculate Aspect Ratio Change String (once per asset) --- if aspect_ratio_change_string_asset == "N/A" and orig_w_aspect is not None and orig_h_aspect is not None: target_w_aspect, target_h_aspect = img_resized.shape[1], img_resized.shape[0] # Use current resized dims try: aspect_string = _normalize_aspect_ratio_change(orig_w_aspect, orig_h_aspect, target_w_aspect, target_h_aspect) aspect_ratio_change_string_asset = aspect_string log.debug(f"Stored aspect ratio change string using {res_key}: '{aspect_string}'") except Exception as aspect_err: log.error(f"Failed to calculate aspect ratio change string using {res_key}: {aspect_err}", exc_info=True) aspect_ratio_change_string_asset = "Error" elif aspect_ratio_change_string_asset == "N/A": aspect_ratio_change_string_asset = "Unknown" # Set to unknown if original dims failed # --- 4. Save Image (using helper) --- source_info = { 'original_extension': original_extension, 'source_bit_depth': source_bit_depth_found or 8, # Use found depth or default 'involved_extensions': {original_extension} # Only self for individual maps } # Get bit depth rule solely from the static configuration using the correct method signature bit_depth_rule = self.config_obj.get_bit_depth_rule(map_type) # Pass only map_type save_result = self._save_image( image_data=img_resized, map_type=map_type, resolution_key=res_key, asset_base_name=base_name, source_info=source_info, output_bit_depth_rule=bit_depth_rule # _save_image uses self.config_obj for other settings ) # --- 5. Store Result --- if save_result: processed_maps_details_asset.setdefault(map_type, {})[res_key] = save_result # Update overall map detail (e.g., final format) if needed current_map_details["output_format"] = save_result.get("format") else: log.error(f"Failed to save {map_type} at {res_key}.") processed_maps_details_asset.setdefault(map_type, {})[f'error_{res_key}'] = "Save failed" except Exception as map_proc_err: log.error(f"Failed processing map {map_type} from {source_path_rel.name}: {map_proc_err}", exc_info=True) processed_maps_details_asset.setdefault(map_type, {})['error'] = str(map_proc_err) # Store collected details for this map type map_details_asset[map_type] = current_map_details # --- Final Metadata Updates --- # Update the passed-in current_asset_metadata dictionary directly current_asset_metadata["map_details"] = map_details_asset current_asset_metadata["image_stats_1k"] = image_stats_asset # Add collected stats current_asset_metadata["aspect_ratio_change_string"] = aspect_ratio_change_string_asset # Add collected aspect string log.info(f"Finished processing individual map files for asset '{asset_name}'.") # Return details needed for organization, stats and aspect ratio are updated in-place return processed_maps_details_asset, image_stats_asset, aspect_ratio_change_string_asset def _merge_maps(self, asset_rule: AssetRule, workspace_path: Path, processed_maps_details_asset: Dict[str, Dict[str, Dict]], current_asset_metadata: Dict) -> Dict[str, Dict[str, Dict]]: """ Merges channels from different source maps for a specific asset based on static merge rules in configuration, using explicit file paths from the AssetRule. Args: asset_rule: The AssetRule object containing file rules for this asset. workspace_path: Path to the directory containing the source files. processed_maps_details_asset: Details of processed maps (used to find common resolutions). current_asset_metadata: Mutable metadata dictionary for the current asset (updated for stats). Returns: Dict[str, Dict[str, Dict]]: Details of the merged maps created for this asset. """ if not self.temp_dir: raise ProcessingEngineError("Engine workspace (temp_dir) not setup.") asset_name = asset_rule.asset_name # Get merge rules from static config merge_rules = self.config_obj.map_merge_rules log.info(f"Asset '{asset_name}': Applying {len(merge_rules)} map merging rule(s) from static config...") # Initialize results for this asset merged_maps_details_asset: Dict[str, Dict[str, Dict]] = defaultdict(dict) for rule_index, rule in enumerate(merge_rules): output_map_type = rule.get("output_map_type") inputs_mapping = rule.get("inputs") # e.g., {"R": "AO", "G": "ROUGH", "B": "METAL"} defaults = rule.get("defaults", {}) rule_bit_depth = rule.get("output_bit_depth", "respect_inputs") if not output_map_type or not inputs_mapping: log.warning(f"Asset '{asset_name}': Skipping static merge rule #{rule_index+1}: Missing 'output_map_type' or 'inputs'. Rule: {rule}") continue log.info(f"-- Asset '{asset_name}': Applying merge rule for '{output_map_type}' --") # --- Find required SOURCE FileRules within the AssetRule --- required_input_file_rules: Dict[str, FileRule] = {} # map_type -> FileRule possible_to_find_sources = True input_types_needed = set(inputs_mapping.values()) # e.g., {"AO", "ROUGH", "METAL"} for input_type in input_types_needed: found_rule_for_type = False # Search in the asset_rule's files for file_rule in asset_rule.files: # Check if the file_rule's map_type matches the required input type # Handle variants (e.g., ROUGH-1 should match ROUGH) if file_rule.item_type_override and file_rule.item_type_override.startswith(input_type): # Check override exists and matches # TODO: Add prioritization logic if multiple files match (e.g., prefer non-gloss rough if gloss exists but isn't needed?) # For now, take the first match. required_input_file_rules[input_type] = file_rule found_rule_for_type = True log.debug(f"Found source FileRule for merge input '{input_type}': {file_rule.file_path} (ItemTypeOverride: {file_rule.item_type_override})") # Gloss status checked during load break # Found the first matching source for this input type if not found_rule_for_type: log.warning(f"Asset '{asset_name}': Required source FileRule for input map type '{input_type}' not found in AssetRule. Cannot perform merge for '{output_map_type}'.") possible_to_find_sources = False break if not possible_to_find_sources: continue # Skip this merge rule # --- Determine common resolutions based on *processed* maps --- # This still seems the most reliable way to know which sizes are actually available possible_resolutions_per_input: List[Set[str]] = [] resolutions_config = self.config_obj.image_resolutions # Static config for input_type in input_types_needed: # Find the corresponding processed map details (might be ROUGH-1, ROUGH-2 etc.) processed_details_for_input = None input_file_rule = required_input_file_rules.get(input_type) if input_file_rule: processed_details_for_input = processed_maps_details_asset.get(input_file_rule.item_type_override) # Use the correct attribute if processed_details_for_input: res_keys = {res for res, details in processed_details_for_input.items() if isinstance(details, dict) and 'error' not in details} if not res_keys: log.warning(f"Asset '{asset_name}': Input map type '{input_type}' (using {input_file_rule.item_type_override if input_file_rule else 'N/A'}) for merge rule '{output_map_type}' has no successfully processed resolutions.") # Use item_type_override possible_resolutions_per_input = [] # Invalidate if any input has no resolutions break possible_resolutions_per_input.append(res_keys) else: # If the input map wasn't processed individually (used_for_merge_only=True) # Assume all configured resolutions are potentially available. Loading will handle skips. log.debug(f"Input map type '{input_type}' for merge rule '{output_map_type}' might not have been processed individually. Assuming all configured resolutions possible.") possible_resolutions_per_input.append(set(resolutions_config.keys())) if not possible_resolutions_per_input: log.warning(f"Asset '{asset_name}': Cannot determine common resolutions for '{output_map_type}'. Skipping rule.") continue common_resolutions = set.intersection(*possible_resolutions_per_input) if not common_resolutions: log.warning(f"Asset '{asset_name}': No common resolutions found among required inputs {input_types_needed} for merge rule '{output_map_type}'. Skipping rule.") continue log.debug(f"Asset '{asset_name}': Common resolutions for '{output_map_type}': {common_resolutions}") # --- Loop through common resolutions --- res_order = {k: resolutions_config[k] for k in common_resolutions if k in resolutions_config} if not res_order: log.warning(f"Asset '{asset_name}': Common resolutions {common_resolutions} do not match config. Skipping merge for '{output_map_type}'.") continue sorted_res_keys = sorted(res_order.keys(), key=lambda k: res_order[k], reverse=True) base_name = asset_name # Use current asset's name for current_res_key in sorted_res_keys: log.debug(f"Asset '{asset_name}': Merging '{output_map_type}' for resolution: {current_res_key}") try: loaded_inputs_data = {} # map_type -> loaded numpy array source_info_for_save = {'involved_extensions': set(), 'max_input_bit_depth': 8} # --- Load required SOURCE maps using helper --- possible_to_load = True target_channels = list(inputs_mapping.keys()) # e.g., ['R', 'G', 'B'] for map_type_needed in input_types_needed: # e.g., {"AO", "ROUGH", "METAL"} file_rule = required_input_file_rules.get(map_type_needed) if not file_rule: log.error(f"Internal Error: FileRule missing for '{map_type_needed}' during merge load.") possible_to_load = False; break source_path_rel_str = file_rule.file_path # Keep original string if needed source_path_rel = Path(source_path_rel_str) # Convert to Path object source_path_abs = workspace_path / source_path_rel is_gloss = file_rule.item_type_override in getattr(self.config_obj, 'gloss_map_identifiers', []) original_ext = source_path_rel.suffix.lower() # Now works on Path object source_info_for_save['involved_extensions'].add(original_ext) log.debug(f"Loading source '{source_path_rel}' for merge input '{map_type_needed}' at {current_res_key} (Gloss: {is_gloss})") img_resized, source_dtype = self._load_and_transform_source( source_path_abs=source_path_abs, map_type=file_rule.item_type_override, # Use the specific type override from rule (e.g., ROUGH-1) target_resolution_key=current_res_key, is_gloss_source=is_gloss # self.loaded_data_cache used internally ) if img_resized is None: log.warning(f"Asset '{asset_name}': Failed to load/transform source '{source_path_rel}' for merge input '{map_type_needed}' at {current_res_key}. Skipping resolution.") possible_to_load = False; break loaded_inputs_data[map_type_needed] = img_resized # Store by base type (AO, ROUGH) # Track max source bit depth if source_dtype == np.uint16: source_info_for_save['max_input_bit_depth'] = max(source_info_for_save['max_input_bit_depth'], 16) # Add other dtype checks if needed if not possible_to_load: continue # --- Calculate Stats for ROUGH source if used and at stats resolution --- stats_res_key = self.config_obj.calculate_stats_resolution if current_res_key == stats_res_key: log.debug(f"Asset '{asset_name}': Checking for ROUGH source stats for '{output_map_type}' at {stats_res_key}") for target_channel, source_map_type in inputs_mapping.items(): if source_map_type == 'ROUGH' and source_map_type in loaded_inputs_data: log.debug(f"Asset '{asset_name}': Calculating stats for ROUGH source (mapped to channel '{target_channel}') for '{output_map_type}' at {stats_res_key}") rough_image_data = loaded_inputs_data[source_map_type] rough_stats = _calculate_image_stats(rough_image_data) if rough_stats: # Update the mutable metadata dict passed in stats_dict = current_asset_metadata.setdefault("merged_map_channel_stats", {}).setdefault(output_map_type, {}).setdefault(target_channel, {}) stats_dict[stats_res_key] = rough_stats log.debug(f"Asset '{asset_name}': Stored ROUGH stats for '{output_map_type}' channel '{target_channel}' at {stats_res_key}: {rough_stats}") else: log.warning(f"Asset '{asset_name}': Failed to calculate ROUGH stats for '{output_map_type}' channel '{target_channel}' at {stats_res_key}.") # --- Determine dimensions --- first_map_type = next(iter(loaded_inputs_data)) h, w = loaded_inputs_data[first_map_type].shape[:2] num_target_channels = len(target_channels) # --- Prepare and Merge Channels --- merged_channels_float32 = [] for target_channel in target_channels: # e.g., 'R', 'G', 'B' source_map_type = inputs_mapping.get(target_channel) # e.g., "AO", "ROUGH", "METAL" channel_data_float32 = None if source_map_type and source_map_type in loaded_inputs_data: img_input = loaded_inputs_data[source_map_type] # Get the loaded NumPy array # Ensure input is float32 0-1 range for merging if img_input.dtype == np.uint16: img_float = img_input.astype(np.float32) / 65535.0 elif img_input.dtype == np.uint8: img_float = img_input.astype(np.float32) / 255.0 elif img_input.dtype == np.float16: img_float = img_input.astype(np.float32) # Assume float16 is 0-1 else: img_float = img_input.astype(np.float32) # Assume other floats are 0-1 num_source_channels = img_float.shape[2] if len(img_float.shape) == 3 else 1 # Extract the correct channel if num_source_channels >= 3: if target_channel == 'R': channel_data_float32 = img_float[:, :, 0] elif target_channel == 'G': channel_data_float32 = img_float[:, :, 1] elif target_channel == 'B': channel_data_float32 = img_float[:, :, 2] elif target_channel == 'A' and num_source_channels == 4: channel_data_float32 = img_float[:, :, 3] else: log.warning(f"Target channel '{target_channel}' invalid for 3/4 channel source '{source_map_type}'.") elif num_source_channels == 1 or len(img_float.shape) == 2: # If source is grayscale, use it for R, G, B, or A target channels channel_data_float32 = img_float.reshape(h, w) else: log.warning(f"Unexpected shape {img_float.shape} for source '{source_map_type}'.") # Apply default if channel data couldn't be extracted if channel_data_float32 is None: default_val = defaults.get(target_channel) if default_val is None: raise ProcessingEngineError(f"Missing input/default for target channel '{target_channel}' in merge rule '{output_map_type}'.") log.debug(f"Using default value {default_val} for target channel '{target_channel}' in '{output_map_type}'.") channel_data_float32 = np.full((h, w), float(default_val), dtype=np.float32) merged_channels_float32.append(channel_data_float32) if not merged_channels_float32 or len(merged_channels_float32) != num_target_channels: raise ProcessingEngineError(f"Channel count mismatch during merge for '{output_map_type}'. Expected {num_target_channels}, got {len(merged_channels_float32)}.") merged_image_float32 = cv2.merge(merged_channels_float32) log.debug(f"Merged channels for '{output_map_type}' ({current_res_key}). Result shape: {merged_image_float32.shape}, dtype: {merged_image_float32.dtype}") # --- Save Merged Map using Helper --- save_result = self._save_image( image_data=merged_image_float32, # Pass the merged float32 data map_type=output_map_type, resolution_key=current_res_key, asset_base_name=base_name, source_info=source_info_for_save, # Pass collected source info output_bit_depth_rule=rule_bit_depth # Pass the rule's requirement # _save_image uses self.config_obj for other settings ) # --- Record details locally --- if save_result: merged_maps_details_asset[output_map_type][current_res_key] = save_result else: log.error(f"Asset '{asset_name}': Failed to save merged map '{output_map_type}' at resolution '{current_res_key}'.") merged_maps_details_asset.setdefault(output_map_type, {})[f'error_{current_res_key}'] = "Save failed via helper" except Exception as merge_res_err: log.error(f"Asset '{asset_name}': Failed merging '{output_map_type}' at resolution '{current_res_key}': {merge_res_err}", exc_info=True) # Store error locally for this asset merged_maps_details_asset.setdefault(output_map_type, {})[f'error_{current_res_key}'] = str(merge_res_err) log.info(f"Asset '{asset_name}': Finished applying map merging rules.") # Return the details for this asset return merged_maps_details_asset def _generate_metadata_file(self, source_rule: SourceRule, asset_rule: AssetRule, current_asset_metadata: Dict, processed_maps_details_asset: Dict[str, Dict[str, Dict]], merged_maps_details_asset: Dict[str, Dict[str, Dict]]) -> Path: """ Gathers metadata for a specific asset based on the AssetRule and processing results, and writes it to a temporary JSON file in the engine's temp_dir. Args: asset_rule: The AssetRule object for this asset. current_asset_metadata: Base metadata dictionary (already contains name, category, archetype, stats, aspect ratio, map_details). processed_maps_details_asset: Details of processed maps for this asset. merged_maps_details_asset: Details of merged maps for this asset. Returns: Path: The path to the generated temporary metadata file. """ if not self.temp_dir: raise ProcessingEngineError("Engine workspace (temp_dir) not setup.") asset_name = asset_rule.asset_name if not asset_name: log.warning("Asset name missing during metadata generation, file may be incomplete or incorrectly named.") asset_name = "UnknownAsset_Metadata" # Fallback for filename log.info(f"Generating metadata file for asset '{asset_name}'...") # Start with the base metadata passed in (already contains name, category, archetype, stats, aspect, map_details) final_metadata = current_asset_metadata.copy() # Use the supplier identifier determined by rules/selection (from SourceRule) final_metadata["supplier_name"] = source_rule.supplier_identifier or self.config_obj.supplier_name # Fallback to config if empty # Populate map resolution details from processing results final_metadata["processed_map_resolutions"] = {} for map_type, res_dict in processed_maps_details_asset.items(): keys = [res for res, d in res_dict.items() if isinstance(d, dict) and 'error' not in d] if keys: final_metadata["processed_map_resolutions"][map_type] = sorted(keys) final_metadata["merged_map_resolutions"] = {} for map_type, res_dict in merged_maps_details_asset.items(): keys = [res for res, d in res_dict.items() if isinstance(d, dict) and 'error' not in d] if keys: final_metadata["merged_map_resolutions"][map_type] = sorted(keys) # Determine maps present based on successful processing for this asset final_metadata["maps_present"] = sorted(list(processed_maps_details_asset.keys())) final_metadata["merged_maps"] = sorted(list(merged_maps_details_asset.keys())) # Determine shader features based on this asset's maps and rules features = set() map_details_asset = final_metadata.get("map_details", {}) # Get from metadata dict for map_type, details in map_details_asset.items(): base_map_type = _get_base_map_type(map_type) # Check standard feature types if base_map_type in ["SSS", "FUZZ", "MASK", "TRANSMISSION", "EMISSION", "CLEARCOAT"]: # Add more as needed features.add(base_map_type) if details.get("derived_from_gloss"): features.add("InvertedGloss") # Check if any resolution was saved as 16-bit res_details = processed_maps_details_asset.get(map_type, {}) if any(res_info.get("bit_depth") == 16 for res_info in res_details.values() if isinstance(res_info, dict)): features.add(f"16bit_{base_map_type}") # Check merged maps for 16-bit output for map_type, res_dict in merged_maps_details_asset.items(): base_map_type = _get_base_map_type(map_type) if any(res_info.get("bit_depth") == 16 for res_info in res_dict.values() if isinstance(res_info, dict)): features.add(f"16bit_{base_map_type}") final_metadata["shader_features"] = sorted(list(features)) # Determine source files in this asset's Extra folder based on FileRule category source_files_in_extra_set = set() for file_rule in asset_rule.files: if file_rule.item_type_override is None: # Assume files without an assigned type are extra/ignored/unmatched source_files_in_extra_set.add(str(file_rule.file_path)) final_metadata["source_files_in_extra"] = sorted(list(source_files_in_extra_set)) # Add processing info (using static config for preset name) final_metadata["_processing_info"] = { "preset_used": getattr(source_rule, 'preset_name', self.config_obj.preset_name), # Use preset name from SourceRule (fallback to static config) "timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "input_source": source_rule.supplier_identifier or "Unknown", # Use identifier from parent SourceRule } # Sort lists just before writing for key in ["maps_present", "merged_maps", "shader_features", "source_files_in_extra"]: if key in final_metadata and isinstance(final_metadata[key], list): final_metadata[key].sort() # Use asset name in temporary filename to avoid conflicts # Use static config for the base metadata filename temp_metadata_filename = f"{asset_name}_{self.config_obj.metadata_filename}" output_path = self.temp_dir / temp_metadata_filename log.debug(f"Writing metadata for asset '{asset_name}' to temporary file: {output_path}") try: with open(output_path, 'w', encoding='utf-8') as f: # Use a custom encoder if numpy types might be present (though they shouldn't be at this stage) json.dump(final_metadata, f, indent=4, ensure_ascii=False, sort_keys=True) log.info(f"Metadata file '{self.config_obj.metadata_filename}' generated successfully for asset '{asset_name}'.") return output_path # Return the path to the temporary file except Exception as e: raise ProcessingEngineError(f"Failed to write metadata file {output_path} for asset '{asset_name}': {e}") from e def _organize_output_files(self, asset_rule: AssetRule, workspace_path: Path, supplier_identifier: str, output_base_path: Path, processed_maps_details_asset: Dict[str, Dict[str, Dict]], merged_maps_details_asset: Dict[str, Dict[str, Dict]], temp_metadata_path: Path): """ Moves/copies processed files for a specific asset from the engine's temp dir and copies EXTRA files from the original workspace to the final output structure, based on the AssetRule and static config. Args: asset_rule: The AssetRule object for this asset. workspace_path: Path to the original workspace containing source files. supplier_identifier: The supplier identifier from the SourceRule. output_base_path: The final base output directory. processed_maps_details_asset: Details of processed maps for this asset. merged_maps_details_asset: Details of merged maps for this asset. temp_metadata_path: Path to the temporary metadata file for this asset. """ if not self.temp_dir or not self.temp_dir.exists(): raise ProcessingEngineError("Engine temp workspace missing.") asset_name = asset_rule.asset_name if not asset_name: raise ProcessingEngineError("Asset name missing for organization.") # Get structure names from static config and arguments # supplier_name = self.config_obj.supplier_name # <<< ISSUE: Uses config supplier metadata_filename = self.config_obj.metadata_filename extra_subdir_name = self.config_obj.extra_files_subdir # Use the supplier identifier passed from the SourceRule if not supplier_identifier: log.warning(f"Asset '{asset_name}': Supplier identifier missing in SourceRule. Using fallback 'UnknownSupplier'.") supplier_identifier = "UnknownSupplier" supplier_sanitized = _sanitize_filename(supplier_identifier) # <<< FIX: Use passed identifier asset_name_sanitized = _sanitize_filename(asset_name) final_dir = output_base_path / supplier_sanitized / asset_name_sanitized log.info(f"Organizing output files for asset '{asset_name_sanitized}' (Supplier: '{supplier_identifier}') into: {final_dir}") try: # Overwrite logic is handled in the main process() method before calling this final_dir.mkdir(parents=True, exist_ok=True) except Exception as e: raise ProcessingEngineError(f"Failed to create final dir {final_dir} for asset '{asset_name_sanitized}': {e}") from e # --- Helper for moving files from engine's temp dir --- def _safe_move(src_rel_path: Path | None, dest_dir: Path, file_desc: str): if not src_rel_path: log.warning(f"Asset '{asset_name_sanitized}': Missing src relative path for {file_desc}."); return source_abs = self.temp_dir / src_rel_path # Path relative to engine's temp # Use the original filename from the source path for the destination dest_abs = dest_dir / src_rel_path.name try: if source_abs.exists(): log.debug(f"Asset '{asset_name_sanitized}': Moving {file_desc}: {source_abs.name} -> {dest_dir.relative_to(output_base_path)}/") dest_dir.mkdir(parents=True, exist_ok=True) shutil.move(str(source_abs), str(dest_abs)) else: log.warning(f"Asset '{asset_name_sanitized}': Source file missing in engine temp for {file_desc}: {source_abs}") except Exception as e: log.error(f"Asset '{asset_name_sanitized}': Failed moving {file_desc} '{source_abs.name}': {e}", exc_info=True) # --- Move Processed/Merged Maps --- for details_dict in [processed_maps_details_asset, merged_maps_details_asset]: for map_type, res_dict in details_dict.items(): if 'error' in res_dict: continue for res_key, details in res_dict.items(): if isinstance(details, dict) and 'path' in details: # details['path'] is relative to engine's temp dir _safe_move(details['path'], final_dir, f"{map_type} ({res_key})") # --- Move Models (copy from original workspace) --- # Models are not processed/saved in temp, copy from original workspace # This requires the original workspace path, which isn't directly available here. # TODO: Revisit how models are handled. Should they be copied to temp first? # For now, assume models are handled by the caller or need adjustment. # log.warning("Model file organization not implemented in ProcessingEngine._organize_output_files yet.") # Find model FileRules and copy from workspace_path (passed to process) # This needs workspace_path access. Let's assume it's available via self for now, though it's not ideal. # Correction: workspace_path is not stored in self. Pass it down or handle differently. # Let's assume the caller handles model copying for now. # --- Move Metadata File --- if temp_metadata_path and temp_metadata_path.exists(): # temp_metadata_path is absolute path within engine's temp dir final_metadata_path = final_dir / metadata_filename # Use standard name from config try: log.debug(f"Asset '{asset_name_sanitized}': Moving metadata file: {temp_metadata_path.name} -> {final_metadata_path.relative_to(output_base_path)}") shutil.move(str(temp_metadata_path), str(final_metadata_path)) except Exception as e: log.error(f"Asset '{asset_name_sanitized}': Failed moving metadata file '{temp_metadata_path.name}': {e}", exc_info=True) else: log.warning(f"Asset '{asset_name_sanitized}': Temporary metadata file path missing or file does not exist: {temp_metadata_path}") # --- Handle "EXTRA" Files (copy from original workspace) --- extra_dir = final_dir / extra_subdir_name copied_extra_files = [] for file_rule in asset_rule.files: if file_rule.item_type_override == "EXTRA": try: source_rel_path = Path(file_rule.file_path) source_abs = workspace_path / source_rel_path dest_abs = extra_dir / source_rel_path.name # Place in Extra subdir, keep original name if source_abs.is_file(): log.debug(f"Asset '{asset_name_sanitized}': Copying EXTRA file: {source_abs.name} -> {extra_dir.relative_to(output_base_path)}/") extra_dir.mkdir(parents=True, exist_ok=True) shutil.copy2(str(source_abs), str(dest_abs)) # copy2 preserves metadata copied_extra_files.append(source_rel_path.name) else: log.warning(f"Asset '{asset_name_sanitized}': Source file marked as EXTRA not found in workspace: {source_abs}") except Exception as copy_err: log.error(f"Asset '{asset_name_sanitized}': Failed copying EXTRA file '{file_rule.file_path}': {copy_err}", exc_info=True) if copied_extra_files: log.info(f"Asset '{asset_name_sanitized}': Copied {len(copied_extra_files)} EXTRA file(s) to '{extra_subdir_name}' subdirectory.") log.info(f"Finished organizing output for asset '{asset_name_sanitized}'.") # --- End of ProcessingEngine Class ---