# processing_engine.py import os import math import shutil import tempfile import logging import json import re import time from pathlib import Path from typing import List, Dict, Tuple, Optional, Set from collections import defaultdict # Attempt to import image processing libraries try: import cv2 import numpy as np except ImportError: print("ERROR: Missing required image processing libraries. Please install opencv-python and numpy:") print("pip install opencv-python numpy") # Allow import to fail but log error; execution will likely fail later cv2 = None np = None # Attempt to import OpenEXR - Check if needed for advanced EXR flags/types try: import OpenEXR import Imath _HAS_OPENEXR = True except ImportError: _HAS_OPENEXR = False # Log this information - basic EXR might still work via OpenCV logging.debug("Optional 'OpenEXR' python package not found. EXR saving relies on OpenCV's built-in support.") # Import project-specific modules try: from configuration import Configuration, ConfigurationError from rule_structure import SourceRule, AssetRule, FileRule # Import necessary structures from utils.path_utils import generate_path_from_pattern # <-- ADDED IMPORT except ImportError: print("ERROR: Cannot import Configuration or rule_structure classes.") print("Ensure configuration.py and rule_structure.py are in the same directory or Python path.") # Allow import to fail but log error; execution will likely fail later Configuration = None SourceRule = None AssetRule = None FileRule = None # Use logger defined in main.py (or configure one here if run standalone) log = logging.getLogger(__name__) # Basic config if logger hasn't been set up elsewhere (e.g., during testing) if not log.hasHandlers(): logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') # --- Custom Exception --- class ProcessingEngineError(Exception): """Custom exception for errors during processing engine operations.""" pass # --- Helper Functions (Moved from AssetProcessor or kept static) --- def _is_power_of_two(n: int) -> bool: """Checks if a number is a power of two.""" return (n > 0) and (n & (n - 1) == 0) def get_nearest_pot(value: int) -> int: """Finds the nearest power of two to the given value.""" if value <= 0: return 1 # Or raise error, POT must be positive if _is_power_of_two(value): return value # Calculate the powers of two below and above the value lower_pot = 1 << (value.bit_length() - 1) upper_pot = 1 << value.bit_length() # Determine which power of two is closer if (value - lower_pot) < (upper_pot - value): return lower_pot else: return upper_pot def calculate_target_dimensions(orig_w, orig_h, target_max_dim) -> tuple[int, int]: """ Calculates target dimensions by first scaling to fit target_max_dim while maintaining aspect ratio, then finding the nearest power-of-two value for each resulting dimension (Stretch/Squash to POT). """ if orig_w <= 0 or orig_h <= 0: # Fallback to target_max_dim if original dimensions are invalid pot_dim = get_nearest_pot(target_max_dim) log.warning(f"Invalid original dimensions ({orig_w}x{orig_h}). Falling back to nearest POT of target_max_dim: {pot_dim}x{pot_dim}") return (pot_dim, pot_dim) # Step 1: Calculate intermediate dimensions maintaining aspect ratio ratio = orig_w / orig_h if ratio > 1: # Width is dominant scaled_w = target_max_dim scaled_h = max(1, round(scaled_w / ratio)) else: # Height is dominant or square scaled_h = target_max_dim scaled_w = max(1, round(scaled_h * ratio)) # Step 2: Find the nearest power of two for each scaled dimension pot_w = get_nearest_pot(scaled_w) pot_h = get_nearest_pot(scaled_h) log.debug(f"POT Calc: Orig=({orig_w}x{orig_h}), MaxDim={target_max_dim} -> Scaled=({scaled_w}x{scaled_h}) -> POT=({pot_w}x{pot_h})") return int(pot_w), int(pot_h) def _calculate_image_stats(image_data: np.ndarray) -> dict | None: """ Calculates min, max, mean for a given numpy image array. Handles grayscale and multi-channel images. Converts to float64 for calculation. """ if image_data is None: log.warning("Attempted to calculate stats on None image data.") return None if np is None: log.error("Numpy not available for stats calculation.") return None try: # Use float64 for calculations to avoid potential overflow/precision issues data_float = image_data.astype(np.float64) # Normalize data_float based on original dtype before calculating stats if image_data.dtype == np.uint16: log.debug("Stats calculation: Normalizing uint16 data to 0-1 range.") data_float /= 65535.0 elif image_data.dtype == np.uint8: log.debug("Stats calculation: Normalizing uint8 data to 0-1 range.") data_float /= 255.0 # Assuming float inputs are already in 0-1 range or similar log.debug(f"Stats calculation: data_float dtype: {data_float.dtype}, shape: {data_float.shape}") # Log a few sample values to check range after normalization if data_float.size > 0: sample_values = data_float.flatten()[:10] # Get first 10 values log.debug(f"Stats calculation: Sample values (first 10) after normalization: {sample_values.tolist()}") if len(data_float.shape) == 2: # Grayscale (H, W) min_val = float(np.min(data_float)) max_val = float(np.max(data_float)) mean_val = float(np.mean(data_float)) stats = {"min": min_val, "max": max_val, "mean": mean_val} log.debug(f"Calculated Grayscale Stats: Min={min_val:.4f}, Max={max_val:.4f}, Mean={mean_val:.4f}") elif len(data_float.shape) == 3: # Color (H, W, C) channels = data_float.shape[2] min_val = [float(v) for v in np.min(data_float, axis=(0, 1))] max_val = [float(v) for v in np.max(data_float, axis=(0, 1))] mean_val = [float(v) for v in np.mean(data_float, axis=(0, 1))] # Assume data is RGB order after potential conversion in _load_and_transform_source stats = {"min": min_val, "max": max_val, "mean": mean_val} log.debug(f"Calculated {channels}-Channel Stats (RGB order): Min={min_val}, Max={max_val}, Mean={mean_val}") else: log.warning(f"Cannot calculate stats for image with unsupported shape {data_float.shape}") return None return stats except Exception as e: log.error(f"Error calculating image stats: {e}", exc_info=True) # Log exception info return {"error": str(e)} def _sanitize_filename(name: str) -> str: """Removes or replaces characters invalid for filenames/directory names.""" if not isinstance(name, str): name = str(name) name = re.sub(r'[^\w.\-]+', '_', name) # Allow alphanumeric, underscore, hyphen, dot name = re.sub(r'_+', '_', name) name = name.strip('_') if not name: name = "invalid_name" return name def _normalize_aspect_ratio_change(original_width, original_height, resized_width, resized_height, decimals=2): """ Calculates the aspect ratio change string (e.g., "EVEN", "X133"). Returns the string representation. """ if original_width <= 0 or original_height <= 0: log.warning("Cannot calculate aspect ratio change with zero original dimensions.") return "InvalidInput" # Avoid division by zero if resize resulted in zero dimensions (shouldn't happen with checks) if resized_width <= 0 or resized_height <= 0: log.warning("Cannot calculate aspect ratio change with zero resized dimensions.") return "InvalidResize" # Original logic from user feedback width_change_percentage = ((resized_width - original_width) / original_width) * 100 height_change_percentage = ((resized_height - original_height) / original_height) * 100 normalized_width_change = width_change_percentage / 100 normalized_height_change = height_change_percentage / 100 normalized_width_change = min(max(normalized_width_change + 1, 0), 2) normalized_height_change = min(max(normalized_height_change + 1, 0), 2) # Handle potential zero division if one dimension change is exactly -100% (normalized to 0) # If both are 0, aspect ratio is maintained. If one is 0, the other dominates. if normalized_width_change == 0 and normalized_height_change == 0: closest_value_to_one = 1.0 # Avoid division by zero, effectively scale_factor = 1 elif normalized_width_change == 0: closest_value_to_one = abs(normalized_height_change) elif normalized_height_change == 0: closest_value_to_one = abs(normalized_width_change) else: closest_value_to_one = min(abs(normalized_width_change), abs(normalized_height_change)) # Add a small epsilon to avoid division by zero if closest_value_to_one is extremely close to 0 epsilon = 1e-9 scale_factor = 1 / (closest_value_to_one + epsilon) if abs(closest_value_to_one) < epsilon else 1 / closest_value_to_one scaled_normalized_width_change = scale_factor * normalized_width_change scaled_normalized_height_change = scale_factor * normalized_height_change output_width = round(scaled_normalized_width_change, decimals) output_height = round(scaled_normalized_height_change, decimals) # Convert to int if exactly 1.0 after rounding if abs(output_width - 1.0) < epsilon: output_width = 1 if abs(output_height - 1.0) < epsilon: output_height = 1 # Determine output string if original_width == original_height or abs(output_width - output_height) < epsilon: output = "EVEN" elif output_width != 1 and output_height == 1: output = f"X{str(output_width).replace('.', '')}" elif output_height != 1 and output_width == 1: output = f"Y{str(output_height).replace('.', '')}" else: # Both changed relative to each other output = f"X{str(output_width).replace('.', '')}Y{str(output_height).replace('.', '')}" log.debug(f"Aspect ratio change calculated: Orig=({original_width}x{original_height}), Resized=({resized_width}x{resized_height}) -> String='{output}'") return output # --- Processing Engine Class --- class ProcessingEngine: """ Handles the core processing pipeline for assets based on explicit rules provided in a SourceRule object and static configuration. It does not perform classification, prediction, or rule fallback internally. """ def __init__(self, config_obj: Configuration): """ Initializes the processing engine with static configuration. Args: config_obj: The loaded Configuration object containing static settings. """ if cv2 is None or np is None or Configuration is None or SourceRule is None: raise ProcessingEngineError("Essential libraries (OpenCV, NumPy) or classes (Configuration, SourceRule) are not available.") if not isinstance(config_obj, Configuration): raise ProcessingEngineError("config_obj must be a valid Configuration object.") self.config_obj: Configuration = config_obj self.temp_dir: Path | None = None # Path to the temporary working directory for a process run self.loaded_data_cache: dict = {} # Cache for loaded/resized data within a single process call log.debug("ProcessingEngine initialized.") def process( self, source_rule: SourceRule, workspace_path: Path, output_base_path: Path, overwrite: bool = False, incrementing_value: Optional[str] = None, # <-- ADDED sha5_value: Optional[str] = None # <-- ADDED ) -> Dict[str, List[str]]: """ Executes the processing pipeline for all assets defined in the SourceRule. Args: source_rule: The SourceRule object containing explicit instructions for all assets and files. workspace_path: The path to the directory containing the source files (e.g., extracted archive). output_base_path: The base directory where processed output will be saved. overwrite: If True, forces reprocessing even if output exists for an asset. incrementing_value: Optional incrementing value for path tokens. sha5_value: Optional SHA5 hash value for path tokens. Returns: Dict[str, List[str]]: A dictionary summarizing the status of each asset: {"processed": [asset_name1, ...], "skipped": [asset_name2, ...], "failed": [asset_name3, ...]} """ log.info(f"VERIFY: ProcessingEngine.process called with rule for input: {source_rule.input_path}") # DEBUG Verify log.debug(f" VERIFY Rule Details: {source_rule}") # DEBUG Verify (Optional detailed log) if not isinstance(source_rule, SourceRule): raise ProcessingEngineError("process() requires a valid SourceRule object.") if not isinstance(workspace_path, Path) or not workspace_path.is_dir(): raise ProcessingEngineError(f"Invalid workspace path provided: {workspace_path}") if not isinstance(output_base_path, Path): raise ProcessingEngineError(f"Invalid output base path provided: {output_base_path}") log.info(f"ProcessingEngine starting process for {len(source_rule.assets)} asset(s) defined in SourceRule.") overall_status = {"processed": [], "skipped": [], "failed": []} self.loaded_data_cache = {} # Reset cache for this run # Store incoming optional values for use in path generation self.current_incrementing_value = incrementing_value self.current_sha5_value = sha5_value log.debug(f"Received incrementing_value: {self.current_incrementing_value}, sha5_value: {self.current_sha5_value}") # Use a temporary directory for intermediate files (like saved maps) try: self.temp_dir = Path(tempfile.mkdtemp(prefix=self.config_obj.temp_dir_prefix)) log.debug(f"Created temporary workspace for engine: {self.temp_dir}") # --- Loop through each asset defined in the SourceRule --- for asset_rule in source_rule.assets: asset_name = asset_rule.asset_name log.info(f"--- Processing asset: '{asset_name}' ---") asset_processed = False asset_skipped = False asset_failed = False temp_metadata_path_asset = None # Track metadata file for this asset try: # --- Determine Effective Supplier (Override > Identifier > Fallback) --- effective_supplier = source_rule.supplier_override # Prioritize override if effective_supplier is None: effective_supplier = source_rule.supplier_identifier # Fallback to original identifier if not effective_supplier: # Check if still None or empty log.warning(f"Asset '{asset_name}': Supplier identifier missing from rule and override. Using fallback 'UnknownSupplier'.") effective_supplier = "UnknownSupplier" # Final fallback log.debug(f"Asset '{asset_name}': Effective supplier determined as '{effective_supplier}' (Override: '{source_rule.supplier_override}', Original: '{source_rule.supplier_identifier}')") # --- Skip Check (using effective supplier) --- supplier_sanitized = _sanitize_filename(effective_supplier) asset_name_sanitized = _sanitize_filename(asset_name) final_dir = output_base_path / supplier_sanitized / asset_name_sanitized metadata_file_path = final_dir / self.config_obj.metadata_filename # Metadata filename still comes from config log.debug(f"Checking for existing output/overwrite at: {final_dir} (using effective supplier: '{effective_supplier}')") if not overwrite and final_dir.exists(): log.info(f"Output directory found for asset '{asset_name_sanitized}' (Supplier: '{effective_supplier}') and overwrite is False. Skipping.") overall_status["skipped"].append(asset_name) asset_skipped = True continue # Skip to the next asset elif overwrite and final_dir.exists(): log.warning(f"Output directory exists for '{asset_name_sanitized}' (Supplier: '{effective_supplier}') and overwrite is True. Removing existing directory: {final_dir}") try: shutil.rmtree(final_dir) except Exception as rm_err: raise ProcessingEngineError(f"Failed to remove existing output directory {final_dir} during overwrite: {rm_err}") from rm_err # --- Prepare Asset Metadata --- # Start with common metadata from the rule, add asset name current_asset_metadata = asset_rule.common_metadata.copy() current_asset_metadata["asset_name"] = asset_name # Use the EFFECTIVE supplier here current_asset_metadata["supplier_name"] = effective_supplier # Add other fields that will be populated current_asset_metadata["maps_present"] = [] current_asset_metadata["merged_maps"] = [] current_asset_metadata["shader_features"] = [] current_asset_metadata["source_files_in_extra"] = [] current_asset_metadata["image_stats_1k"] = {} current_asset_metadata["map_details"] = {} current_asset_metadata["aspect_ratio_change_string"] = "N/A" current_asset_metadata["merged_map_channel_stats"] = {} # Initialize for stats # --- Process Individual Maps --- processed_maps_details_asset, image_stats_asset, aspect_ratio_change_string_asset = self._process_individual_maps( asset_rule=asset_rule, workspace_path=workspace_path, # Use the workspace path received by process() (contains prepared files) current_asset_metadata=current_asset_metadata # Pass mutable dict ) # Update metadata with results (stats and aspect ratio are updated directly in current_asset_metadata by the method) # map_details are also updated directly in current_asset_metadata # --- Merge Maps --- merged_maps_details_asset = self._merge_maps( asset_rule=asset_rule, workspace_path=workspace_path, processed_maps_details_asset=processed_maps_details_asset, # Needed to find resolutions current_asset_metadata=current_asset_metadata # Pass mutable dict for stats ) # --- Generate Metadata --- # Pass effective_supplier instead of the whole source_rule temp_metadata_path_asset = self._generate_metadata_file( effective_supplier=effective_supplier, # Pass the determined supplier asset_rule=asset_rule, current_asset_metadata=current_asset_metadata, # Pass the populated dict processed_maps_details_asset=processed_maps_details_asset, merged_maps_details_asset=merged_maps_details_asset ) # --- Organize Output --- # Pass effective_supplier instead of source_rule.supplier_identifier self._organize_output_files( asset_rule=asset_rule, workspace_path=workspace_path, # Pass the original workspace path supplier_identifier=effective_supplier, # Pass the determined supplier output_base_path=output_base_path, # Pass output path processed_maps_details_asset=processed_maps_details_asset, merged_maps_details_asset=merged_maps_details_asset, temp_metadata_path=temp_metadata_path_asset ) log.info(f"--- Asset '{asset_name}' processed successfully (Supplier: {effective_supplier}). ---") overall_status["processed"].append(asset_name) asset_processed = True except Exception as asset_err: log.error(f"--- Failed processing asset '{asset_name}': {asset_err} ---", exc_info=True) overall_status["failed"].append(asset_name) asset_failed = True # Continue to the next asset log.info(f"ProcessingEngine finished. Summary: {overall_status}") return overall_status except Exception as e: log.exception(f"Processing engine failed unexpectedly: {e}") # Ensure all assets not processed/skipped are marked as failed processed_or_skipped = set(overall_status["processed"] + overall_status["skipped"]) for asset_rule in source_rule.assets: if asset_rule.asset_name not in processed_or_skipped: overall_status["failed"].append(asset_rule.asset_name) return overall_status # Return partial status if possible finally: self._cleanup_workspace() def _cleanup_workspace(self): """Removes the temporary workspace directory if it exists.""" if self.temp_dir and self.temp_dir.exists(): try: log.debug(f"Cleaning up engine temporary workspace: {self.temp_dir}") # Ignore errors during cleanup (e.g., permission errors on copied .git files) shutil.rmtree(self.temp_dir, ignore_errors=True) self.temp_dir = None log.debug("Engine temporary workspace cleaned up successfully.") except Exception as e: log.error(f"Failed to remove engine temporary workspace {self.temp_dir}: {e}", exc_info=True) self.loaded_data_cache = {} # Clear cache after cleanup def _get_ftd_key_from_override(self, override_string: str) -> Optional[str]: """ Attempts to derive a base FILE_TYPE_DEFINITIONS key from an override string which might have a variant suffix (e.g., "MAP_COL-1" -> "MAP_COL"). """ if not override_string: # Handle empty or None override_string return None if override_string in self.config_obj.FILE_TYPE_DEFINITIONS: return override_string # Regex to remove trailing suffixes like -, -, _ # e.g., "MAP_COL-1" -> "MAP_COL", "MAP_ROUGH_variantA" -> "MAP_ROUGH" base_candidate = re.sub(r"(-[\w\d]+|_[\w\d]+)$", "", override_string) if base_candidate in self.config_obj.FILE_TYPE_DEFINITIONS: return base_candidate # log.debug(f"Could not derive FTD key from override_string '{override_string}'. Tried '{base_candidate}'.") return None def _get_map_variant_suffix(self, map_identifier: str, base_ftd_key: str) -> str: """ Extracts a variant suffix (e.g., "-1", "_variantA") from a map_identifier if the base_ftd_key is a prefix of it and the suffix indicates a variant. Example: map_identifier="MAP_COL-1", base_ftd_key="MAP_COL" -> returns "-1" map_identifier="MAP_COL_variant", base_ftd_key="MAP_COL" -> returns "_variant" map_identifier="MAP_COL", base_ftd_key="MAP_COL" -> returns "" """ if not base_ftd_key: # Ensure base_ftd_key is not empty return "" if map_identifier.startswith(base_ftd_key): suffix = map_identifier[len(base_ftd_key):] # Ensure suffix looks like a variant (starts with - or _) or is empty if not suffix or suffix.startswith(('-', '_')): return suffix return "" # Default to no suffix def _get_base_map_type(self, map_identifier: str) -> str: """ Gets the base standard type (e.g., "COL") from a map identifier (e.g., "MAP_COL-1", "COL-1"), or returns the identifier itself if it's a merged type (e.g., "NRMRGH") or not resolvable to a standard type. """ if not map_identifier: # Handle empty or None map_identifier return "" # Try to get FTD key from "MAP_COL-1" -> "MAP_COL" or "MAP_COL" -> "MAP_COL" ftd_key = self._get_ftd_key_from_override(map_identifier) if ftd_key: definition = self.config_obj.FILE_TYPE_DEFINITIONS.get(ftd_key) if definition and definition.get("standard_type"): # Check if standard_type exists and is not empty return definition["standard_type"] # Returns "COL" # If map_identifier was like "COL-1" or "ROUGH" (a standard_type itself, possibly with suffix) # Strip suffix and check if the base is a known standard_type # Regex to get the initial part of the string composed of uppercase letters and underscores base_candidate_match = re.match(r"([A-Z_]+)", map_identifier.upper()) if base_candidate_match: potential_std_type = base_candidate_match.group(1) for _, definition_val in self.config_obj.FILE_TYPE_DEFINITIONS.items(): if definition_val.get("standard_type") == potential_std_type: return potential_std_type # Found "COL" # If it's a merged map type (e.g., "NRMRGH"), it won't be in FTDs as a key or standard_type. # Check if it's one of the output_map_types from MAP_MERGE_RULES. for rule in self.config_obj.map_merge_rules: if rule.get("output_map_type") == map_identifier: return map_identifier # Return "NRMRGH" as is # Fallback: return the original identifier, uppercased. log.debug(f"_get_base_map_type: Could not determine standard base for '{map_identifier}'. Returning as is (uppercase).") return map_identifier.upper() def _load_and_transform_source(self, source_path_abs: Path, map_type: str, target_resolution_key: str, is_gloss_source: bool) -> Tuple[Optional[np.ndarray], Optional[np.dtype]]: """ Loads a source image file, performs initial prep (BGR->RGB, Gloss->Rough), resizes it to the target resolution, and caches the result. Uses static configuration from self.config_obj. Args: source_path_abs: Absolute path to the source file in the workspace. map_type: The item_type_override (e.g., "MAP_NRM", "MAP_ROUGH-1"). target_resolution_key: The key for the target resolution (e.g., "4K"). is_gloss_source: Boolean indicating if this source should be treated as gloss for inversion. Returns: Tuple containing: - Resized NumPy array (float32 for gloss-inverted, original type otherwise) or None if loading/processing fails. - Original source NumPy dtype or None if loading fails. """ if cv2 is None or np is None: log.error("OpenCV or NumPy not available for image loading.") return None, None cache_key = (source_path_abs, target_resolution_key) # Use absolute path for cache key if cache_key in self.loaded_data_cache: log.debug(f"CACHE HIT: Returning cached data for {source_path_abs.name} at {target_resolution_key}") return self.loaded_data_cache[cache_key] # Return tuple (image_data, source_dtype) log.debug(f"CACHE MISS: Loading and transforming {source_path_abs.name} for {target_resolution_key} (map_type: {map_type})") img_prepared = None source_dtype = None try: # --- 1. Load Source Image --- # Determine read flag based on is_grayscale from FTD ftd_key = self._get_ftd_key_from_override(map_type) # map_type is item_type_override is_map_grayscale = False standard_type_for_checks = None # For MASK check if ftd_key: ftd_definition = self.config_obj.FILE_TYPE_DEFINITIONS.get(ftd_key, {}) is_map_grayscale = ftd_definition.get("is_grayscale", False) standard_type_for_checks = ftd_definition.get("standard_type") log.debug(f"For map_type '{map_type}' (FTD key '{ftd_key}'), is_grayscale: {is_map_grayscale}, standard_type: {standard_type_for_checks}") else: log.warning(f"Could not determine FTD key for map_type '{map_type}' to check is_grayscale. Assuming not grayscale.") read_flag = cv2.IMREAD_GRAYSCALE if is_map_grayscale else cv2.IMREAD_UNCHANGED # Special case for MASK: always load unchanged first to check alpha if standard_type_for_checks == 'MASK': log.debug(f"Map type '{map_type}' (standard_type 'MASK') will be loaded with IMREAD_UNCHANGED for alpha check.") read_flag = cv2.IMREAD_UNCHANGED log.debug(f"Loading source {source_path_abs.name} with flag: {'GRAYSCALE' if read_flag == cv2.IMREAD_GRAYSCALE else 'UNCHANGED'}") img_loaded = cv2.imread(str(source_path_abs), read_flag) if img_loaded is None: raise ProcessingEngineError(f"Failed to load image file: {source_path_abs.name} with flag {read_flag}") source_dtype = img_loaded.dtype log.debug(f"Loaded source {source_path_abs.name}, dtype: {source_dtype}, shape: {img_loaded.shape}") # --- 2. Initial Preparation (BGR->RGB, Gloss Inversion, MASK handling) --- img_prepared = img_loaded # Start with loaded image # MASK Handling (Extract alpha or convert) - Do this BEFORE general color conversions if standard_type_for_checks == 'MASK': log.debug(f"Processing as MASK type for {source_path_abs.name}.") shape = img_prepared.shape if len(shape) == 3 and shape[2] == 4: # BGRA or RGBA (OpenCV loads BGRA) log.debug("MASK processing: Extracting alpha channel (4-channel source).") img_prepared = img_prepared[:, :, 3] # Extract alpha elif len(shape) == 3 and shape[2] == 3: # BGR or RGB log.debug("MASK processing: Converting 3-channel source to Grayscale.") img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_BGR2GRAY if read_flag != cv2.IMREAD_GRAYSCALE else cv2.COLOR_RGB2GRAY) # If loaded UNCHANGED and 3-channel, assume BGR elif len(shape) == 2: log.debug("MASK processing: Source is already grayscale.") else: log.warning(f"MASK processing: Unexpected source shape {shape}. Cannot reliably extract mask.") img_prepared = None # Cannot process else: # BGR -> RGB conversion (only for 3/4-channel images not loaded as grayscale) if len(img_prepared.shape) == 3 and img_prepared.shape[2] >= 3 and read_flag != cv2.IMREAD_GRAYSCALE: log.debug(f"Converting loaded image from BGR to RGB for {source_path_abs.name}.") if img_prepared.shape[2] == 4: # BGRA -> RGBA (then to RGB) img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_BGRA2RGB) # OpenCV BGRA to RGB else: # BGR -> RGB img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_BGR2RGB) elif len(img_prepared.shape) == 2: log.debug(f"Image {source_path_abs.name} is grayscale or loaded as such, no BGR->RGB conversion needed.") if img_prepared is None: raise ProcessingEngineError("Image data is None after MASK/Color prep.") # Gloss -> Roughness Inversion # map_type is item_type_override, e.g. "MAP_ROUGH-1" # standard_type_for_checks is "ROUGH" if standard_type_for_checks == 'ROUGH' and is_gloss_source: log.info(f"Performing Gloss->Roughness inversion for {source_path_abs.name} (map_type: {map_type})") if len(img_prepared.shape) == 3: log.debug("Gloss Inversion: Converting 3-channel image to grayscale before inversion.") img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_RGB2GRAY) # Should be RGB at this point if 3-channel stats_before = _calculate_image_stats(img_prepared) log.debug(f"Gloss Inversion: Image stats BEFORE inversion: {stats_before}") if source_dtype == np.uint16: img_float = 1.0 - (img_prepared.astype(np.float32) / 65535.0) elif source_dtype == np.uint8: img_float = 1.0 - (img_prepared.astype(np.float32) / 255.0) else: # Assuming float input is already 0-1 range img_float = 1.0 - img_prepared.astype(np.float32) img_prepared = np.clip(img_float, 0.0, 1.0) # Result is float32 stats_after = _calculate_image_stats(img_prepared) log.debug(f"Gloss Inversion: Image stats AFTER inversion (float32): {stats_after}") log.debug(f"Inverted gloss map stored as float32 for ROUGH, original dtype: {source_dtype}") # Ensure data is float32/uint8/uint16 for resizing compatibility if isinstance(img_prepared, np.ndarray) and img_prepared.dtype not in [np.uint8, np.uint16, np.float32, np.float16]: log.warning(f"Converting unexpected dtype {img_prepared.dtype} to float32 before resizing for {source_path_abs.name}.") img_prepared = img_prepared.astype(np.float32) # --- 3. Resize --- if img_prepared is None: raise ProcessingEngineError(f"Image data is None after initial prep for {source_path_abs.name}.") orig_h, orig_w = img_prepared.shape[:2] # Get resolutions from static config target_dim_px = self.config_obj.image_resolutions.get(target_resolution_key) if not target_dim_px: raise ProcessingEngineError(f"Target resolution key '{target_resolution_key}' not found in config.") # Avoid upscaling check (using static config) max_original_dimension = max(orig_w, orig_h) # TODO: Add config option for allowing upscale? For now, skip if target > original. if target_dim_px > max_original_dimension: log.warning(f"Target dimension {target_dim_px}px is larger than original {max_original_dimension}px for {source_path_abs.name}. Skipping resize for {target_resolution_key}.") # Store None in cache for this specific resolution to avoid retrying self.loaded_data_cache[cache_key] = (None, source_dtype) return None, source_dtype # Indicate resize was skipped if orig_w <= 0 or orig_h <= 0: raise ProcessingEngineError(f"Invalid original dimensions ({orig_w}x{orig_h}) for {source_path_abs.name}.") target_w, target_h = calculate_target_dimensions(orig_w, orig_h, target_dim_px) interpolation = cv2.INTER_LANCZOS4 if (target_w * target_h) < (orig_w * orig_h) else cv2.INTER_CUBIC log.debug(f"Resizing {source_path_abs.name} from ({orig_w}x{orig_h}) to ({target_w}x{target_h}) for {target_resolution_key}") img_resized = cv2.resize(img_prepared, (target_w, target_h), interpolation=interpolation) # --- 4. Cache and Return --- # Keep resized dtype unless it was gloss-inverted (which is float32) final_data_to_cache = img_resized if map_type.startswith('ROUGH') and is_gloss_source and final_data_to_cache.dtype != np.float32: final_data_to_cache = final_data_to_cache.astype(np.float32) log.debug(f"CACHING result for {cache_key}. Shape: {final_data_to_cache.shape}, Dtype: {final_data_to_cache.dtype}") self.loaded_data_cache[cache_key] = (final_data_to_cache, source_dtype) return final_data_to_cache, source_dtype except Exception as e: log.error(f"Error in _load_and_transform_source for {source_path_abs.name} at {target_resolution_key}: {e}", exc_info=True) # Cache None to prevent retrying on error for this specific key self.loaded_data_cache[cache_key] = (None, None) return None, None def _save_image(self, image_data: np.ndarray, supplier_name: str, asset_name: str, current_map_identifier: str, resolution_key: str, source_info: dict, output_bit_depth_rule: str) -> Optional[Dict]: """ Handles saving an image NumPy array to a temporary file within the engine's temp_dir using token-based path generation. Uses static configuration from self.config_obj for formats, quality, etc. The 'maptype' token for the filename is derived based on standard_type and variants. Args: image_data: NumPy array containing the image data to save. supplier_name: The effective supplier name for the asset. asset_name: The name of the asset. current_map_identifier: The map type being saved (e.g., "MAP_COL", "MAP_ROUGH-1", "NRMRGH"). This is item_type_override or merged map type. resolution_key: The resolution key (e.g., "4K"). source_info: Dictionary containing details about the source(s). output_bit_depth_rule: Rule for determining output bit depth. Returns: A dictionary containing details of the saved file or None if saving failed. """ if cv2 is None or np is None: log.error("OpenCV or NumPy not available for image saving.") return None if image_data is None: log.error(f"Cannot save image for {current_map_identifier} ({resolution_key}): image_data is None.") return None if not self.temp_dir or not self.temp_dir.exists(): log.error(f"Cannot save image for {current_map_identifier} ({resolution_key}): Engine temp_dir is invalid.") return None try: h, w = image_data.shape[:2] current_dtype = image_data.dtype log.debug(f"Saving {current_map_identifier} ({resolution_key}) for asset '{asset_name}'. Input shape: {image_data.shape}, dtype: {current_dtype}") config = self.config_obj primary_fmt_16, fallback_fmt_16 = config.get_16bit_output_formats() fmt_8bit_config = config.get_8bit_output_format() threshold = config.resolution_threshold_for_jpg force_lossless_map_types = config.force_lossless_map_types # Should contain standard_types like "NRM", "DISP" jpg_quality = config.jpg_quality png_compression_level = config._core_settings.get('PNG_COMPRESSION_LEVEL', 6) image_resolutions = config.image_resolutions output_directory_pattern = config.get('OUTPUT_DIRECTORY_PATTERN', '[supplier]/[assetname]') output_filename_pattern = config.get('OUTPUT_FILENAME_PATTERN', '[assetname]_[maptype]_[resolution].[ext]') # --- 1. Determine Output Bit Depth --- source_bpc = source_info.get('source_bit_depth', 8) max_input_bpc = source_info.get('max_input_bit_depth', source_bpc) output_dtype_target, output_bit_depth = np.uint8, 8 if output_bit_depth_rule == 'force_8bit': output_dtype_target, output_bit_depth = np.uint8, 8 elif output_bit_depth_rule == 'force_16bit': output_dtype_target, output_bit_depth = np.uint16, 16 elif output_bit_depth_rule == 'respect': if source_bpc == 16: output_dtype_target, output_bit_depth = np.uint16, 16 elif output_bit_depth_rule == 'respect_inputs': if max_input_bpc == 16: output_dtype_target, output_bit_depth = np.uint16, 16 else: log.warning(f"Unknown output_bit_depth_rule '{output_bit_depth_rule}'. Defaulting to 8-bit.") output_dtype_target, output_bit_depth = np.uint8, 8 log.debug(f"Target output bit depth: {output_bit_depth}-bit for {current_map_identifier}") # --- 2. Determine Output Format --- output_format, output_ext, save_params, needs_float16 = "", "", [], False # Use the (potentially suffixed) standard_type for lossless check base_standard_type_for_lossless_check = self._get_base_map_type(current_map_identifier) # "COL", "NRM", "DISP-Detail" -> "DISP" # Check if the pure standard type (without suffix) is in force_lossless_map_types pure_standard_type = self._get_ftd_key_from_override(base_standard_type_for_lossless_check) # Get FTD key if possible std_type_from_ftd = None if pure_standard_type and pure_standard_type in self.config_obj.FILE_TYPE_DEFINITIONS: std_type_from_ftd = self.config_obj.FILE_TYPE_DEFINITIONS[pure_standard_type].get("standard_type") # Use std_type_from_ftd if available and non-empty, else base_standard_type_for_lossless_check check_type_for_lossless = std_type_from_ftd if std_type_from_ftd else base_standard_type_for_lossless_check force_lossless = check_type_for_lossless in force_lossless_map_types original_extension = source_info.get('original_extension', '.png') involved_extensions = source_info.get('involved_extensions', {original_extension}) target_dim_px = image_resolutions.get(resolution_key, 0) if force_lossless: log.debug(f"Format forced to lossless for map type '{current_map_identifier}' (checked as '{check_type_for_lossless}').") if output_bit_depth == 16: output_format = primary_fmt_16 if output_format.startswith("exr"): output_ext, needs_float16 = ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF]) else: output_format = fallback_fmt_16 if fallback_fmt_16 == "png" else "png"; output_ext = ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) else: output_format, output_ext = "png", ".png"; save_params = [cv2.IMWRITE_PNG_COMPRESSION, png_compression_level] elif output_bit_depth == 8 and target_dim_px >= threshold: output_format = 'jpg'; output_ext = '.jpg'; save_params.extend([cv2.IMWRITE_JPEG_QUALITY, jpg_quality]) else: highest_format_str = 'jpg' if '.exr' in relevant_extensions: highest_format_str = 'exr' elif '.tif' in relevant_extensions: highest_format_str = 'tif' elif '.png' in relevant_extensions: highest_format_str = 'png' if highest_format_str == 'exr': if output_bit_depth == 16: output_format, output_ext, needs_float16 = "exr", ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF]) else: output_format, output_ext = "png", ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) elif highest_format_str == 'tif' or highest_format_str == 'png': # Treat TIF like PNG for output choice here if output_bit_depth == 16: output_format = primary_fmt_16 if output_format.startswith("exr"): output_ext, needs_float16 = ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF]) else: output_format = "png"; output_ext = ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) else: output_format, output_ext = "png", ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) else: # Default to configured 8-bit format output_format = fmt_8bit_config; output_ext = f".{output_format}" if output_format == "png": save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_compression_level]) elif output_format == "jpg": save_params.extend([cv2.IMWRITE_JPEG_QUALITY, jpg_quality]) if output_format == "jpg" and output_bit_depth == 16: log.warning(f"Output format JPG, but target 16-bit. Forcing 8-bit for {current_map_identifier}.") output_dtype_target, output_bit_depth = np.uint8, 8 log.debug(f"Determined save format for {current_map_identifier}: {output_format}, ext: {output_ext}, bit_depth: {output_bit_depth}") # --- 3. Final Data Type Conversion --- img_to_save = image_data.copy() if output_dtype_target == np.uint8 and img_to_save.dtype != np.uint8: if img_to_save.dtype == np.uint16: img_to_save = (img_to_save.astype(np.float32) / 65535.0 * 255.0).astype(np.uint8) elif img_to_save.dtype in [np.float16, np.float32]: img_to_save = (np.clip(img_to_save, 0.0, 1.0) * 255.0).astype(np.uint8) else: img_to_save = img_to_save.astype(np.uint8) elif output_dtype_target == np.uint16 and img_to_save.dtype != np.uint16: if img_to_save.dtype == np.uint8: img_to_save = img_to_save.astype(np.uint16) * 257 elif img_to_save.dtype in [np.float16, np.float32]: img_to_save = (np.clip(img_to_save, 0.0, 1.0) * 65535.0).astype(np.uint16) else: img_to_save = img_to_save.astype(np.uint16) if needs_float16 and img_to_save.dtype != np.float16: if img_to_save.dtype == np.uint16: img_to_save = (img_to_save.astype(np.float32) / 65535.0).astype(np.float16) elif img_to_save.dtype == np.uint8: img_to_save = (img_to_save.astype(np.float32) / 255.0).astype(np.float16) elif img_to_save.dtype == np.float32: img_to_save = img_to_save.astype(np.float16) else: log.warning(f"Cannot convert {img_to_save.dtype} to float16 for EXR save."); return None # --- 4. Final Color Space Conversion (RGB -> BGR for non-EXR) --- img_save_final = img_to_save if len(img_to_save.shape) == 3 and img_to_save.shape[2] == 3 and not output_format.startswith("exr"): try: img_save_final = cv2.cvtColor(img_to_save, cv2.COLOR_RGB2BGR) except Exception as cvt_err: log.error(f"RGB->BGR conversion failed for {current_map_identifier}: {cvt_err}. Saving original."); # --- 5. Determine maptype token for filename --- filename_map_type_token: str is_merged_map = any(rule.get("output_map_type") == current_map_identifier for rule in self.config_obj.map_merge_rules) if is_merged_map: filename_map_type_token = current_map_identifier # e.g., "NRMRGH" else: base_ftd_key = self._get_ftd_key_from_override(current_map_identifier) # e.g., "MAP_COL" if base_ftd_key: definition = self.config_obj.FILE_TYPE_DEFINITIONS.get(base_ftd_key) if definition and "standard_type" in definition: standard_type_alias = definition["standard_type"] # e.g., "COL" if standard_type_alias: # Ensure not empty variant_suffix = self._get_map_variant_suffix(current_map_identifier, base_ftd_key) # e.g., "-1" or "" if standard_type_alias in self.config_obj.respect_variant_map_types: filename_map_type_token = standard_type_alias + variant_suffix # e.g., "COL-1" else: filename_map_type_token = standard_type_alias # e.g., "COL" else: log.warning(f"Empty standard_type for FTD key '{base_ftd_key}'. Using identifier '{current_map_identifier}' for maptype token.") filename_map_type_token = current_map_identifier else: log.warning(f"No definition or standard_type for FTD key '{base_ftd_key}'. Using identifier '{current_map_identifier}' for maptype token.") filename_map_type_token = current_map_identifier else: log.warning(f"Could not derive FTD key from '{current_map_identifier}'. Using it directly for maptype token.") filename_map_type_token = current_map_identifier log.debug(f"Filename maptype token for '{current_map_identifier}' is '{filename_map_type_token}'") # --- 6. Construct Path using Token Pattern & Save --- token_data = { "supplier": _sanitize_filename(supplier_name), "assetname": _sanitize_filename(asset_name), "maptype": filename_map_type_token, # Use the derived token "resolution": resolution_key, "width": w, "height": h, "bitdepth": output_bit_depth, "ext": output_ext.lstrip('.') } if hasattr(self, 'current_incrementing_value') and self.current_incrementing_value is not None: token_data['incrementingvalue'] = self.current_incrementing_value if hasattr(self, 'current_sha5_value') and self.current_sha5_value is not None: token_data['sha5'] = self.current_sha5_value try: relative_dir_path_str = generate_path_from_pattern(output_directory_pattern, token_data) filename_str = generate_path_from_pattern(output_filename_pattern, token_data) full_relative_path_str = str(Path(relative_dir_path_str) / filename_str) except Exception as path_gen_err: log.error(f"Failed to generate output path for {current_map_identifier} with data {token_data}: {path_gen_err}", exc_info=True) return None output_path_temp = self.temp_dir / full_relative_path_str log.debug(f"Attempting to save {current_map_identifier} to temporary path: {output_path_temp}") try: output_path_temp.parent.mkdir(parents=True, exist_ok=True) except Exception as mkdir_err: log.error(f"Failed to create temporary directory {output_path_temp.parent}: {mkdir_err}", exc_info=True) return None saved_successfully = False actual_format_saved = output_format try: cv2.imwrite(str(output_path_temp), img_save_final, save_params) saved_successfully = True log.info(f" > Saved {map_type} ({resolution_key}, {output_bit_depth}-bit) as {output_format}") except Exception as save_err: log.error(f"Save failed ({output_format}) for {map_type} {resolution_key}: {save_err}") # --- Try Fallback --- if output_bit_depth == 16 and output_format.startswith("exr") and fallback_fmt_16 != output_format and fallback_fmt_16 == "png": log.warning(f"Attempting fallback PNG save for {map_type} {resolution_key}") actual_format_saved = "png"; output_ext = ".png" # Regenerate path with .png extension for fallback token_data_fallback = token_data.copy() token_data_fallback["ext"] = "png" try: # Regenerate directory and filename separately for fallback relative_dir_path_str_fb = generate_path_from_pattern(output_directory_pattern, token_data_fallback) filename_str_fb = generate_path_from_pattern(output_filename_pattern, token_data_fallback) full_relative_path_str_fb = str(Path(relative_dir_path_str_fb) / filename_str_fb) output_path_temp = self.temp_dir / full_relative_path_str_fb # Update temp path for fallback output_path_temp.parent.mkdir(parents=True, exist_ok=True) # Ensure dir exists except Exception as path_gen_err_fb: log.error(f"Failed to generate fallback PNG path: {path_gen_err_fb}", exc_info=True) return None # Cannot save fallback without path save_params_fallback = [cv2.IMWRITE_PNG_COMPRESSION, png_compression_level] img_fallback = None; target_fallback_dtype = np.uint16 # Convert original data (before float16 conversion) to uint16 for PNG fallback if img_to_save.dtype == np.float16: # This means original was likely float or uint16/8 converted to float16 # Safest is to convert the float16 back to uint16 img_scaled = np.clip(img_to_save.astype(np.float32) * 65535.0, 0, 65535) img_fallback = img_scaled.astype(target_fallback_dtype) elif img_to_save.dtype == target_fallback_dtype: img_fallback = img_to_save # Already uint16 else: log.error(f"Cannot convert {img_to_save.dtype} for PNG fallback."); return None # --- Conditional RGB -> BGR Conversion for fallback --- img_fallback_save_final = img_fallback is_3_channel_fallback = len(img_fallback.shape) == 3 and img_fallback.shape[2] == 3 if is_3_channel_fallback: # PNG is non-EXR log.debug(f"Converting RGB to BGR for fallback PNG save {map_type} ({resolution_key})") try: img_fallback_save_final = cv2.cvtColor(img_fallback, cv2.COLOR_RGB2BGR) except Exception as cvt_err_fb: log.error(f"Failed RGB->BGR conversion for fallback PNG: {cvt_err_fb}. Saving original.") try: cv2.imwrite(str(output_path_temp), img_fallback_save_final, save_params_fallback) saved_successfully = True log.info(f" > Saved {map_type} ({resolution_key}) using fallback PNG") except Exception as fallback_err: log.error(f"Fallback PNG save failed for {map_type} {resolution_key}: {fallback_err}", exc_info=True) else: log.error(f"No suitable fallback available or applicable for failed save of {map_type} ({resolution_key}) as {output_format}.") # --- 6. Return Result --- if saved_successfully: # Return the full relative path string generated by the patterns final_relative_path_str = full_relative_path_str_fb if actual_format_saved == "png" and output_format.startswith("exr") else full_relative_path_str return { "path": final_relative_path_str, # Store relative path string "resolution": resolution_key, "width": w, "height": h, "bit_depth": output_bit_depth, "format": actual_format_saved } else: return None # Indicate save failure except Exception as e: log.error(f"Unexpected error in _save_image for {map_type} ({resolution_key}): {e}", exc_info=True) return None def _process_individual_maps(self, asset_rule: AssetRule, workspace_path: Path, current_asset_metadata: Dict) -> Tuple[Dict[str, Dict[str, Dict]], Dict[str, Dict], str]: """ Processes, resizes, and saves individual map files for a specific asset based on the provided AssetRule and static configuration. Args: asset_rule: The AssetRule object containing file rules for this asset. workspace_path: Path to the directory containing the source files. current_asset_metadata: Mutable metadata dictionary for the current asset (updated directly). Returns: Tuple containing: - processed_maps_details_asset: Dict mapping map_type to resolution details. - image_stats_asset: Dict mapping map_type to calculated image statistics (also added to current_asset_metadata). - aspect_ratio_change_string_asset: String indicating aspect ratio change (also added to current_asset_metadata). """ if not self.temp_dir: raise ProcessingEngineError("Engine workspace (temp_dir) not setup.") asset_name = asset_rule.asset_name log.info(f"Processing individual map files for asset '{asset_name}'...") # Initialize results specific to this asset processed_maps_details_asset: Dict[str, Dict[str, Dict]] = defaultdict(dict) image_stats_asset: Dict[str, Dict] = {} # Local dict for stats map_details_asset: Dict[str, Dict] = {} # Store details like source bit depth, gloss inversion aspect_ratio_change_string_asset: str = "N/A" # --- Settings retrieval from static config --- resolutions = self.config_obj.image_resolutions stats_res_key = self.config_obj.calculate_stats_resolution stats_target_dim = resolutions.get(stats_res_key) if not stats_target_dim: log.warning(f"Stats resolution key '{stats_res_key}' not found in config. Stats skipped for '{asset_name}'.") base_name = asset_name # Use the asset name from the rule # --- Aspect Ratio Calculation Setup --- first_map_rule_for_aspect = next((fr for fr in asset_rule.files if fr.item_type_override is not None and fr.item_type_override != "EXTRA"), None) # Exclude EXTRA orig_w_aspect, orig_h_aspect = None, None if first_map_rule_for_aspect: first_res_key = next(iter(resolutions)) # Use first resolution key source_path_abs = workspace_path / first_map_rule_for_aspect.file_path temp_img_for_dims, _ = self._load_and_transform_source( source_path_abs, first_map_rule_for_aspect.item_type_override, first_res_key, is_gloss_source=False, # Added: Not relevant for dimension check, but required by method # self.loaded_data_cache is used internally by the method ) if temp_img_for_dims is not None: orig_h_aspect, orig_w_aspect = temp_img_for_dims.shape[:2] log.debug(f"Got original dimensions ({orig_w_aspect}x{orig_h_aspect}) for aspect ratio calculation from {first_map_rule_for_aspect.file_path}") else: log.warning(f"Could not load image {first_map_rule_for_aspect.file_path} to get original dimensions for aspect ratio.") else: log.warning("No map files found in AssetRule, cannot calculate aspect ratio string.") # --- Process Each Individual Map defined in the AssetRule --- for file_rule in asset_rule.files: # --- Check if this file should be processed individually --- # Skip if no item type is assigned, if it's explicitly "EXTRA", or if marked to skip # Check if this file should be processed individually or skipped should_skip = ( file_rule.item_type_override is None or file_rule.item_type_override == "EXTRA" or # Explicitly skip "EXTRA" type getattr(file_rule, 'skip_processing', False) or file_rule.item_type == "FILE_IGNORE" # Consolidated check: Use item_type for base classification ) if should_skip: skip_reason = [] if file_rule.item_type_override is None: skip_reason.append("No ItemTypeOverride") if file_rule.item_type_override == "EXTRA": skip_reason.append("Explicitly EXTRA type") if getattr(file_rule, 'skip_processing', False): skip_reason.append("SkipProcessing flag set") if file_rule.item_type == "FILE_IGNORE": skip_reason.append("ItemType is FILE_IGNORE") log.debug(f"Skipping individual processing for {file_rule.file_path} ({', '.join(skip_reason)})") continue # Skip to the next file_rule # --- Proceed with processing for this file_rule --- source_path_rel = Path(file_rule.file_path) # Ensure it's a Path object # IMPORTANT: Use the ENGINE's workspace_path (self.temp_dir) for loading, # as individual maps should have been copied there by the caller (ProcessingTask) # Correction: _process_individual_maps receives the *engine's* temp_dir as workspace_path source_path_abs = workspace_path / source_path_rel map_type = file_rule.item_type_override # Use the explicit map type from the rule # Determine if the source is gloss based on the flag set during prediction # is_gloss_source = map_type in gloss_identifiers # <<< INCORRECT: Re-calculates based on target type is_gloss_source = getattr(file_rule, 'is_gloss_source', False) # <<< CORRECT: Use flag from FileRule object log.debug(f"Using is_gloss_source={is_gloss_source} directly from FileRule for {file_rule.file_path}") # DEBUG ADDED original_extension = source_path_rel.suffix.lower() # Get from path log.info(f"-- Asset '{asset_name}': Processing Individual Map: {map_type} (Source: {source_path_rel.name}, IsGlossSource: {is_gloss_source}) --") # DEBUG: Added flag to log current_map_details = {"derived_from_gloss": is_gloss_source} source_bit_depth_found = None # Track if we've found the bit depth for this map type try: # --- Loop through target resolutions from static config --- for res_key, target_dim_px in resolutions.items(): log.debug(f"Processing {map_type} for resolution: {res_key}...") # --- 1. Load and Transform Source (using helper + cache) --- # This now only runs for files that have an item_type_override img_resized, source_dtype = self._load_and_transform_source( source_path_abs=source_path_abs, map_type=map_type, # Pass the specific map type (e.g., ROUGH-1) target_resolution_key=res_key, is_gloss_source=is_gloss_source # self.loaded_data_cache is used internally ) if img_resized is None: # This warning now correctly indicates a failure for a map we *intended* to process log.warning(f"Failed to load/transform source map {source_path_rel} for {res_key}. Skipping resolution.") continue # Skip this resolution # Store source bit depth once found if source_dtype is not None and source_bit_depth_found is None: source_bit_depth_found = 16 if source_dtype == np.uint16 else (8 if source_dtype == np.uint8 else 8) # Default non-uint to 8 current_map_details["source_bit_depth"] = source_bit_depth_found log.debug(f"Stored source bit depth for {map_type}: {source_bit_depth_found}") # --- 2. Calculate Stats (if applicable) --- if res_key == stats_res_key and stats_target_dim: log.debug(f"Calculating stats for {map_type} using {res_key} image...") stats = _calculate_image_stats(img_resized) if stats: image_stats_asset[map_type] = stats # Store locally first else: log.warning(f"Stats calculation failed for {map_type} at {res_key}.") # --- 3. Calculate Aspect Ratio Change String (once per asset) --- if aspect_ratio_change_string_asset == "N/A" and orig_w_aspect is not None and orig_h_aspect is not None: target_w_aspect, target_h_aspect = img_resized.shape[1], img_resized.shape[0] # Use current resized dims try: aspect_string = _normalize_aspect_ratio_change(orig_w_aspect, orig_h_aspect, target_w_aspect, target_h_aspect) aspect_ratio_change_string_asset = aspect_string log.debug(f"Stored aspect ratio change string using {res_key}: '{aspect_string}'") except Exception as aspect_err: log.error(f"Failed to calculate aspect ratio change string using {res_key}: {aspect_err}", exc_info=True) aspect_ratio_change_string_asset = "Error" elif aspect_ratio_change_string_asset == "N/A": aspect_ratio_change_string_asset = "Unknown" # Set to unknown if original dims failed # --- 4. Save Image (using helper) --- source_info = { 'original_extension': original_extension, 'source_bit_depth': source_bit_depth_found or 8, # Use found depth or default 'involved_extensions': {original_extension} # Only self for individual maps } # Get bit depth rule solely from the static configuration using the correct method signature bit_depth_rule = self.config_obj.get_bit_depth_rule(map_type) # Pass only map_type # Determine the map_type to use for saving (use item_type_override) save_map_type = file_rule.item_type_override # If item_type_override is None, this file shouldn't be saved as an individual map. # This case should ideally be caught by the skip logic earlier, but adding a check here for safety. if save_map_type is None: log.warning(f"Skipping save for {file_rule.file_path}: item_type_override is None.") continue # Skip saving this file # Get supplier name from metadata (set in process method) supplier_name = current_asset_metadata.get("supplier_name", "UnknownSupplier") save_result = self._save_image( image_data=img_resized, supplier_name=supplier_name, asset_name=base_name, current_map_identifier=save_map_type, # Pass the map type to be saved resolution_key=res_key, source_info=source_info, output_bit_depth_rule=bit_depth_rule ) # --- 5. Store Result --- if save_result: processed_maps_details_asset.setdefault(map_type, {})[res_key] = save_result # Update overall map detail (e.g., final format) if needed current_map_details["output_format"] = save_result.get("format") else: log.error(f"Failed to save {map_type} at {res_key}.") processed_maps_details_asset.setdefault(map_type, {})[f'error_{res_key}'] = "Save failed" except Exception as map_proc_err: log.error(f"Failed processing map {map_type} from {source_path_rel.name}: {map_proc_err}", exc_info=True) processed_maps_details_asset.setdefault(map_type, {})['error'] = str(map_proc_err) # Store collected details for this map type map_details_asset[map_type] = current_map_details # --- Final Metadata Updates --- # Update the passed-in current_asset_metadata dictionary directly current_asset_metadata["map_details"] = map_details_asset current_asset_metadata["image_stats_1k"] = image_stats_asset # Add collected stats current_asset_metadata["aspect_ratio_change_string"] = aspect_ratio_change_string_asset # Add collected aspect string log.info(f"Finished processing individual map files for asset '{asset_name}'.") # Return details needed for organization, stats and aspect ratio are updated in-place return processed_maps_details_asset, image_stats_asset, aspect_ratio_change_string_asset def _merge_maps(self, asset_rule: AssetRule, workspace_path: Path, processed_maps_details_asset: Dict[str, Dict[str, Dict]], current_asset_metadata: Dict) -> Dict[str, Dict[str, Dict]]: """ Merges channels from different source maps for a specific asset based on static merge rules in configuration, using explicit file paths from the AssetRule. Args: asset_rule: The AssetRule object containing file rules for this asset. workspace_path: Path to the directory containing the source files. processed_maps_details_asset: Details of processed maps (used to find common resolutions). current_asset_metadata: Mutable metadata dictionary for the current asset (updated for stats). Returns: Dict[str, Dict[str, Dict]]: Details of the merged maps created for this asset. """ if not self.temp_dir: raise ProcessingEngineError("Engine workspace (temp_dir) not setup.") asset_name = asset_rule.asset_name # Get merge rules from static config merge_rules = self.config_obj.map_merge_rules log.info(f"Asset '{asset_name}': Applying {len(merge_rules)} map merging rule(s) from static config...") # Initialize results for this asset merged_maps_details_asset: Dict[str, Dict[str, Dict]] = defaultdict(dict) for rule_index, rule in enumerate(merge_rules): output_map_type = rule.get("output_map_type") inputs_mapping = rule.get("inputs") # e.g., {"R": "AO", "G": "ROUGH", "B": "METAL"} defaults = rule.get("defaults", {}) rule_bit_depth = rule.get("output_bit_depth", "respect_inputs") if not output_map_type or not inputs_mapping: log.warning(f"Asset '{asset_name}': Skipping static merge rule #{rule_index+1}: Missing 'output_map_type' or 'inputs'. Rule: {rule}") continue log.info(f"-- Asset '{asset_name}': Applying merge rule for '{output_map_type}' --") # --- Find required SOURCE FileRules within the AssetRule --- required_input_file_rules: Dict[str, FileRule] = {} # map_type -> FileRule possible_to_find_sources = True input_types_needed = set(inputs_mapping.values()) # e.g., {"AO", "ROUGH", "METAL"} for input_type in input_types_needed: found_rule_for_type = False # Search in the asset_rule's files for file_rule in asset_rule.files: # Check if the file_rule's item_type_override matches the required input type item_override = getattr(file_rule, 'item_type_override', None) item_base_type = getattr(file_rule, 'item_type', None) # Get base type for ignore check # Check if override matches the required input type AND the base type is not FILE_IGNORE if item_override == input_type and item_base_type != "FILE_IGNORE": # Found a valid match based on item_type_override and not ignored required_input_file_rules[input_type] = file_rule found_rule_for_type = True # Update log message (see step 2) log.debug(f"Found source FileRule for merge input '{input_type}': {file_rule.file_path} (ItemTypeOverride: {item_override}, ItemType: {item_base_type})") break # Take the first valid match found if not found_rule_for_type: log.warning(f"Asset '{asset_name}': Required source FileRule for input map type '{input_type}' not found in AssetRule. Cannot perform merge for '{output_map_type}'.") possible_to_find_sources = False break if not possible_to_find_sources: continue # Skip this merge rule # --- Determine common resolutions based on *processed* maps --- # This still seems the most reliable way to know which sizes are actually available possible_resolutions_per_input: List[Set[str]] = [] resolutions_config = self.config_obj.image_resolutions # Static config for input_type in input_types_needed: # Find the corresponding processed map details (might be ROUGH-1, ROUGH-2 etc.) processed_details_for_input = None input_file_rule = required_input_file_rules.get(input_type) if input_file_rule: processed_details_for_input = processed_maps_details_asset.get(input_file_rule.item_type_override) # Use the correct attribute if processed_details_for_input: res_keys = {res for res, details in processed_details_for_input.items() if isinstance(details, dict) and 'error' not in details} if not res_keys: log.warning(f"Asset '{asset_name}': Input map type '{input_type}' (using {input_file_rule.item_type_override if input_file_rule else 'N/A'}) for merge rule '{output_map_type}' has no successfully processed resolutions.") # Use item_type_override possible_resolutions_per_input = [] # Invalidate if any input has no resolutions break possible_resolutions_per_input.append(res_keys) else: # If the input map wasn't processed individually (used_for_merge_only=True) # Assume all configured resolutions are potentially available. Loading will handle skips. log.debug(f"Input map type '{input_type}' for merge rule '{output_map_type}' might not have been processed individually. Assuming all configured resolutions possible.") possible_resolutions_per_input.append(set(resolutions_config.keys())) if not possible_resolutions_per_input: log.warning(f"Asset '{asset_name}': Cannot determine common resolutions for '{output_map_type}'. Skipping rule.") continue common_resolutions = set.intersection(*possible_resolutions_per_input) if not common_resolutions: log.warning(f"Asset '{asset_name}': No common resolutions found among required inputs {input_types_needed} for merge rule '{output_map_type}'. Skipping rule.") continue log.debug(f"Asset '{asset_name}': Common resolutions for '{output_map_type}': {common_resolutions}") # --- Loop through common resolutions --- res_order = {k: resolutions_config[k] for k in common_resolutions if k in resolutions_config} if not res_order: log.warning(f"Asset '{asset_name}': Common resolutions {common_resolutions} do not match config. Skipping merge for '{output_map_type}'.") continue sorted_res_keys = sorted(res_order.keys(), key=lambda k: res_order[k], reverse=True) base_name = asset_name # Use current asset's name for current_res_key in sorted_res_keys: log.debug(f"Asset '{asset_name}': Merging '{output_map_type}' for resolution: {current_res_key}") try: loaded_inputs_data = {} # map_type -> loaded numpy array source_info_for_save = {'involved_extensions': set(), 'max_input_bit_depth': 8} # --- Load required SOURCE maps using helper --- possible_to_load = True target_channels = list(inputs_mapping.keys()) # e.g., ['R', 'G', 'B'] for map_type_needed in input_types_needed: # e.g., {"AO", "ROUGH", "METAL"} file_rule = required_input_file_rules.get(map_type_needed) if not file_rule: log.error(f"Internal Error: FileRule missing for '{map_type_needed}' during merge load.") possible_to_load = False; break source_path_rel_str = file_rule.file_path # Keep original string if needed source_path_rel = Path(source_path_rel_str) # Convert to Path object source_path_abs = workspace_path / source_path_rel is_gloss = file_rule.item_type_override in getattr(self.config_obj, 'gloss_map_identifiers', []) original_ext = source_path_rel.suffix.lower() # Now works on Path object source_info_for_save['involved_extensions'].add(original_ext) log.debug(f"Loading source '{source_path_rel}' for merge input '{map_type_needed}' at {current_res_key} (Gloss: {is_gloss})") img_resized, source_dtype = self._load_and_transform_source( source_path_abs=source_path_abs, map_type=file_rule.item_type_override, # Use the specific type override from rule (e.g., ROUGH-1) target_resolution_key=current_res_key, is_gloss_source=is_gloss # self.loaded_data_cache used internally ) if img_resized is None: log.warning(f"Asset '{asset_name}': Failed to load/transform source '{source_path_rel}' for merge input '{map_type_needed}' at {current_res_key}. Skipping resolution.") possible_to_load = False; break loaded_inputs_data[map_type_needed] = img_resized # Store by base type (AO, ROUGH) # Track max source bit depth if source_dtype == np.uint16: source_info_for_save['max_input_bit_depth'] = max(source_info_for_save['max_input_bit_depth'], 16) # Add other dtype checks if needed if not possible_to_load: continue # --- Calculate Stats for ROUGH source if used and at stats resolution --- stats_res_key = self.config_obj.calculate_stats_resolution if current_res_key == stats_res_key: log.debug(f"Asset '{asset_name}': Checking for ROUGH source stats for '{output_map_type}' at {stats_res_key}") for target_channel, source_map_type in inputs_mapping.items(): if source_map_type == 'ROUGH' and source_map_type in loaded_inputs_data: log.debug(f"Asset '{asset_name}': Calculating stats for ROUGH source (mapped to channel '{target_channel}') for '{output_map_type}' at {stats_res_key}") rough_image_data = loaded_inputs_data[source_map_type] rough_stats = _calculate_image_stats(rough_image_data) if rough_stats: # Update the mutable metadata dict passed in stats_dict = current_asset_metadata.setdefault("merged_map_channel_stats", {}).setdefault(output_map_type, {}).setdefault(target_channel, {}) stats_dict[stats_res_key] = rough_stats log.debug(f"Asset '{asset_name}': Stored ROUGH stats for '{output_map_type}' channel '{target_channel}' at {stats_res_key}: {rough_stats}") else: log.warning(f"Asset '{asset_name}': Failed to calculate ROUGH stats for '{output_map_type}' channel '{target_channel}' at {stats_res_key}.") # --- Determine dimensions --- first_map_type = next(iter(loaded_inputs_data)) h, w = loaded_inputs_data[first_map_type].shape[:2] num_target_channels = len(target_channels) # --- Prepare and Merge Channels --- merged_channels_float32 = [] for target_channel in target_channels: # e.g., 'R', 'G', 'B' source_map_type = inputs_mapping.get(target_channel) # e.g., "AO", "ROUGH", "METAL" channel_data_float32 = None if source_map_type and source_map_type in loaded_inputs_data: img_input = loaded_inputs_data[source_map_type] # Get the loaded NumPy array # Ensure input is float32 0-1 range for merging if img_input.dtype == np.uint16: img_float = img_input.astype(np.float32) / 65535.0 elif img_input.dtype == np.uint8: img_float = img_input.astype(np.float32) / 255.0 elif img_input.dtype == np.float16: img_float = img_input.astype(np.float32) # Assume float16 is 0-1 else: img_float = img_input.astype(np.float32) # Assume other floats are 0-1 num_source_channels = img_float.shape[2] if len(img_float.shape) == 3 else 1 # Extract the correct channel if num_source_channels >= 3: if target_channel == 'R': channel_data_float32 = img_float[:, :, 0] elif target_channel == 'G': channel_data_float32 = img_float[:, :, 1] elif target_channel == 'B': channel_data_float32 = img_float[:, :, 2] elif target_channel == 'A' and num_source_channels == 4: channel_data_float32 = img_float[:, :, 3] else: log.warning(f"Target channel '{target_channel}' invalid for 3/4 channel source '{source_map_type}'.") elif num_source_channels == 1 or len(img_float.shape) == 2: # If source is grayscale, use it for R, G, B, or A target channels channel_data_float32 = img_float.reshape(h, w) else: log.warning(f"Unexpected shape {img_float.shape} for source '{source_map_type}'.") # Apply default if channel data couldn't be extracted if channel_data_float32 is None: default_val = defaults.get(target_channel) if default_val is None: raise ProcessingEngineError(f"Missing input/default for target channel '{target_channel}' in merge rule '{output_map_type}'.") log.debug(f"Using default value {default_val} for target channel '{target_channel}' in '{output_map_type}'.") channel_data_float32 = np.full((h, w), float(default_val), dtype=np.float32) merged_channels_float32.append(channel_data_float32) if not merged_channels_float32 or len(merged_channels_float32) != num_target_channels: raise ProcessingEngineError(f"Channel count mismatch during merge for '{output_map_type}'. Expected {num_target_channels}, got {len(merged_channels_float32)}.") merged_image_float32 = cv2.merge(merged_channels_float32) log.debug(f"Merged channels for '{output_map_type}' ({current_res_key}). Result shape: {merged_image_float32.shape}, dtype: {merged_image_float32.dtype}") # --- Save Merged Map using Helper --- # Get supplier name from metadata (set in process method) supplier_name = current_asset_metadata.get("supplier_name", "UnknownSupplier") save_result = self._save_image( image_data=merged_image_float32, supplier_name=supplier_name, asset_name=base_name, current_map_identifier=output_map_type, # Merged map type resolution_key=current_res_key, source_info=source_info_for_save, output_bit_depth_rule=rule_bit_depth ) # --- Record details locally --- if save_result: merged_maps_details_asset[output_map_type][current_res_key] = save_result else: log.error(f"Asset '{asset_name}': Failed to save merged map '{output_map_type}' at resolution '{current_res_key}'.") merged_maps_details_asset.setdefault(output_map_type, {})[f'error_{current_res_key}'] = "Save failed via helper" except Exception as merge_res_err: log.error(f"Asset '{asset_name}': Failed merging '{output_map_type}' at resolution '{current_res_key}': {merge_res_err}", exc_info=True) # Store error locally for this asset merged_maps_details_asset.setdefault(output_map_type, {})[f'error_{current_res_key}'] = str(merge_res_err) log.info(f"Asset '{asset_name}': Finished applying map merging rules.") # Return the details for this asset return merged_maps_details_asset def _generate_metadata_file(self, effective_supplier: str, asset_rule: AssetRule, current_asset_metadata: Dict, processed_maps_details_asset: Dict[str, Dict[str, Dict]], merged_maps_details_asset: Dict[str, Dict[str, Dict]]) -> Tuple[Path, str]: """ Gathers metadata for a specific asset based on the AssetRule and processing results, and writes it to a temporary JSON file in the engine's temp_dir using separate directory/filename patterns. Args: effective_supplier: The supplier name to use (override or original). asset_rule: The AssetRule object for this asset. current_asset_metadata: Base metadata dictionary (already contains name, category, archetype, stats, aspect ratio, map_details). processed_maps_details_asset: Details of processed maps for this asset. merged_maps_details_asset: Details of merged maps for this asset. Returns: Tuple[Path, str]: A tuple containing the relative directory Path object and the filename string within the temp_dir. """ if not self.temp_dir: raise ProcessingEngineError("Engine workspace (temp_dir) not setup.") asset_name = asset_rule.asset_name if not asset_name: log.warning("Asset name missing during metadata generation, file may be incomplete or incorrectly named.") asset_name = "UnknownAsset_Metadata" # Fallback for filename log.info(f"Generating metadata file for asset '{asset_name}' (Supplier: {effective_supplier})...") # Start with the base metadata passed in (already contains name, category, archetype, stats, aspect, map_details) final_metadata = current_asset_metadata.copy() final_metadata["category"] = asset_rule.asset_type # Ensure standardized asset type is in metadata # Use the effective supplier passed as argument final_metadata["supplier_name"] = effective_supplier # Already determined in process() # Populate map resolution details from processing results final_metadata["processed_map_resolutions"] = {} for map_type, res_dict in processed_maps_details_asset.items(): keys = [res for res, d in res_dict.items() if isinstance(d, dict) and 'error' not in d] if keys: final_metadata["processed_map_resolutions"][map_type] = sorted(keys) final_metadata["merged_map_resolutions"] = {} for map_type, res_dict in merged_maps_details_asset.items(): keys = [res for res, d in res_dict.items() if isinstance(d, dict) and 'error' not in d] if keys: final_metadata["merged_map_resolutions"][map_type] = sorted(keys) # Determine maps present based on successful processing for this asset final_metadata["maps_present"] = sorted(list(processed_maps_details_asset.keys())) final_metadata["merged_maps"] = sorted(list(merged_maps_details_asset.keys())) # Determine shader features based on this asset's maps and rules features = set() map_details_asset = final_metadata.get("map_details", {}) # Get from metadata dict for map_type, details in map_details_asset.items(): # map_type here is item_type_override like "MAP_COL-1" base_standard_type = self._get_base_map_type(map_type) # Should give "COL" # Check standard feature types if base_standard_type in ["SSS", "FUZZ", "MASK", "TRANSMISSION", "EMISSION", "CLEARCOAT"]: features.add(base_standard_type) if details.get("derived_from_gloss"): features.add("InvertedGloss") # Check if any resolution was saved as 16-bit res_details = processed_maps_details_asset.get(map_type, {}) if any(res_info.get("bit_depth") == 16 for res_info in res_details.values() if isinstance(res_info, dict)): features.add(f"16bit_{base_standard_type}") # Check merged maps for 16-bit output for map_type, res_dict in merged_maps_details_asset.items(): # map_type here is "NRMRGH" base_standard_type = self._get_base_map_type(map_type) # Should give "NRMRGH" if any(res_info.get("bit_depth") == 16 for res_info in res_dict.values() if isinstance(res_info, dict)): features.add(f"16bit_{base_standard_type}") final_metadata["shader_features"] = sorted(list(features)) # Determine source files in this asset's Extra folder based on FileRule category source_files_in_extra_set = set() for file_rule in asset_rule.files: if file_rule.item_type_override is None: # Assume files without an assigned type are extra/ignored/unmatched source_files_in_extra_set.add(str(file_rule.file_path)) final_metadata["source_files_in_extra"] = sorted(list(source_files_in_extra_set)) # Add processing info final_metadata["_processing_info"] = { "preset_used": self.config_obj.preset_name, # Preset name comes from the engine's config "timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "input_source": effective_supplier, # Use the effective supplier } # Sort lists just before writing for key in ["maps_present", "merged_maps", "shader_features", "source_files_in_extra"]: if key in final_metadata and isinstance(final_metadata[key], list): final_metadata[key].sort() # --- Generate Path and Save --- # Get the new separate patterns from config output_directory_pattern = self.config_obj.get('OUTPUT_DIRECTORY_PATTERN', '[supplier]/[assetname]') output_filename_pattern = self.config_obj.get('OUTPUT_FILENAME_PATTERN', '[assetname]_[maptype]_[resolution].[ext]') metadata_filename_base = self.config_obj.metadata_filename # e.g., "metadata.json" metadata_ext = Path(metadata_filename_base).suffix.lstrip('.') or 'json' metadata_maptype = Path(metadata_filename_base).stem # Use filename stem as maptype token token_data = { "supplier": _sanitize_filename(effective_supplier), "assetname": _sanitize_filename(asset_name), "maptype": metadata_maptype, # Use filename stem for maptype token "resolution": "meta", # Use a fixed value for resolution token "width": 0, # Not applicable "height": 0, # Not applicable "bitdepth": 0, # Not applicable "ext": metadata_ext # Use extension from config filename } # Add optional token data if available if hasattr(self, 'current_incrementing_value') and self.current_incrementing_value is not None: token_data['incrementingvalue'] = self.current_incrementing_value if hasattr(self, 'current_sha5_value') and self.current_sha5_value is not None: token_data['sha5'] = self.current_sha5_value log.debug(f"Token data for _generate_metadata_file path generation: {token_data}") # DEBUG LOG try: # Generate directory and filename separately relative_dir_path_str = generate_path_from_pattern(output_directory_pattern, token_data) filename_str = generate_path_from_pattern(output_filename_pattern, token_data) # Combine for the full temporary path full_relative_path_str = str(Path(relative_dir_path_str) / filename_str) relative_dir_path = Path(relative_dir_path_str) # Keep the directory Path object except Exception as path_gen_err: log.error(f"Failed to generate metadata path using patterns '{output_directory_pattern}' / '{output_filename_pattern}' and data {token_data}: {path_gen_err}", exc_info=True) raise ProcessingEngineError(f"Failed to generate metadata path for asset '{asset_name}'") from path_gen_err output_path_temp_abs = self.temp_dir / full_relative_path_str # Save to engine's temp dir, preserving structure log.debug(f"Writing metadata for asset '{asset_name}' to temporary file: {output_path_temp_abs}") # Ensure parent directory exists in temp (using the full path) try: output_path_temp_abs.parent.mkdir(parents=True, exist_ok=True) except Exception as mkdir_err: log.error(f"Failed to create temporary directory {output_path_temp_abs.parent} for metadata: {mkdir_err}", exc_info=True) raise ProcessingEngineError(f"Failed to create temporary directory for metadata for asset '{asset_name}'") from mkdir_err try: with open(output_path_temp_abs, 'w', encoding='utf-8') as f: json.dump(final_metadata, f, indent=4, ensure_ascii=False, sort_keys=True) log.info(f"Metadata file '{filename_str}' generated successfully for asset '{asset_name}' at relative temp path '{full_relative_path_str}'.") # Return the RELATIVE directory Path object and the filename string return relative_dir_path, filename_str except Exception as e: raise ProcessingEngineError(f"Failed to write metadata file {output_path_temp_abs} for asset '{asset_name}': {e}") from e def _organize_output_files(self, asset_rule: AssetRule, workspace_path: Path, supplier_identifier: str, output_base_path: Path, processed_maps_details_asset: Dict[str, Dict[str, Dict]], merged_maps_details_asset: Dict[str, Dict[str, Dict]], temp_metadata_info: Tuple[Path, str]): """ Moves/copies processed files for a specific asset from the engine's temp dir and copies EXTRA files from the original workspace to the final output structure, using the relative paths generated by the token pattern. Args: asset_rule: The AssetRule object for this asset. workspace_path: Path to the original workspace containing source files. supplier_identifier: The supplier identifier from the SourceRule. output_base_path: The final base output directory. processed_maps_details_asset: Details of processed maps for this asset. merged_maps_details_asset: Details of merged maps for this asset. temp_metadata_info: Tuple containing the relative directory Path and filename string for the metadata file within temp_dir. """ if not self.temp_dir or not self.temp_dir.exists(): raise ProcessingEngineError("Engine temp workspace missing.") asset_name = asset_rule.asset_name if not asset_name: raise ProcessingEngineError("Asset name missing for organization.") if not asset_name: raise ProcessingEngineError("Asset name missing for organization.") asset_name_sanitized = _sanitize_filename(asset_name) # Still useful for logging # Get structure names from static config extra_subdir_name = self.config_obj.extra_files_subdir log.info(f"Organizing output files for asset '{asset_name_sanitized}' using generated paths relative to: {output_base_path}") # --- Helper for moving files from engine's temp dir to final output --- def _safe_move_to_final(src_rel_path_str: str | None, file_desc: str): """Moves a file from temp to its final location based on its relative path string.""" if not src_rel_path_str: log.warning(f"Asset '{asset_name_sanitized}': Missing src relative path string for {file_desc}. Cannot move.") return source_abs = self.temp_dir / src_rel_path_str # Absolute path in temp dest_abs = output_base_path / src_rel_path_str # Final absolute path try: if source_abs.exists(): # Ensure final destination directory exists dest_abs.parent.mkdir(parents=True, exist_ok=True) log.debug(f"Asset '{asset_name_sanitized}': Moving {file_desc}: {src_rel_path_str} -> {dest_abs.relative_to(output_base_path)}") shutil.move(str(source_abs), str(dest_abs)) else: log.warning(f"Asset '{asset_name_sanitized}': Source file missing in engine temp for {file_desc}: {source_abs}") except Exception as e: log.error(f"Asset '{asset_name_sanitized}': Failed moving {file_desc} '{src_rel_path_str}': {e}", exc_info=True) # --- Move Processed/Merged Maps --- moved_map_count = 0 for details_dict in [processed_maps_details_asset, merged_maps_details_asset]: for map_type, res_dict in details_dict.items(): # Skip if the whole map type failed (e.g., merge rule source missing) if isinstance(res_dict, dict) and 'error' in res_dict and len(res_dict) == 1: log.warning(f"Skipping move for map type '{map_type}' due to processing error: {res_dict['error']}") continue for res_key, details in res_dict.items(): # Skip specific resolution errors if isinstance(details, str) and details.startswith("error_"): log.warning(f"Skipping move for {map_type} ({res_key}) due to error: {details}") continue if isinstance(details, dict) and 'path' in details: # details['path'] is the relative path string within temp_dir relative_path_str = details['path'] _safe_move_to_final(relative_path_str, f"{map_type} ({res_key})") moved_map_count += 1 log.debug(f"Asset '{asset_name_sanitized}': Moved {moved_map_count} map files.") # --- Move Metadata File --- if temp_metadata_info: relative_dir_path, filename = temp_metadata_info metadata_rel_path_str = str(relative_dir_path / filename) _safe_move_to_final(metadata_rel_path_str, "metadata file") else: log.warning(f"Asset '{asset_name_sanitized}': Temporary metadata info missing. Cannot move metadata file.") # --- Handle "EXTRA" Files (copy from original workspace to final asset dir) --- # Determine the final asset directory based on the metadata's relative directory path final_asset_relative_dir = relative_dir_path if temp_metadata_info else None if final_asset_relative_dir is not None: # Check explicitly for None final_extra_dir_abs = output_base_path / final_asset_relative_dir / extra_subdir_name log.debug(f"Asset '{asset_name_sanitized}': Determined final EXTRA directory: {final_extra_dir_abs}") copied_extra_files = [] for file_rule in asset_rule.files: # Copy files explicitly marked as EXTRA or those with no item_type_override (unmatched) if file_rule.item_type_override == "EXTRA" or file_rule.item_type_override is None: try: source_rel_path = Path(file_rule.file_path) source_abs = workspace_path / source_rel_path # Place in Extra subdir within the final asset dir, keep original name dest_abs = final_extra_dir_abs / source_rel_path.name if source_abs.is_file(): log.debug(f"Asset '{asset_name_sanitized}': Copying EXTRA/unmatched file: {source_rel_path} -> {final_extra_dir_abs.relative_to(output_base_path)}/") final_extra_dir_abs.mkdir(parents=True, exist_ok=True) shutil.copy2(str(source_abs), str(dest_abs)) # copy2 preserves metadata copied_extra_files.append(source_rel_path.name) elif source_abs.is_dir(): log.debug(f"Asset '{asset_name_sanitized}': Skipping EXTRA/unmatched directory: {source_rel_path}") else: log.warning(f"Asset '{asset_name_sanitized}': Source file marked as EXTRA/unmatched not found in workspace: {source_abs}") except Exception as copy_err: log.error(f"Asset '{asset_name_sanitized}': Failed copying EXTRA/unmatched file '{file_rule.file_path}': {copy_err}", exc_info=True) if copied_extra_files: log.info(f"Asset '{asset_name_sanitized}': Copied {len(copied_extra_files)} EXTRA/unmatched file(s) to '{final_extra_dir_abs.relative_to(output_base_path)}' subdirectory.") else: log.warning(f"Asset '{asset_name_sanitized}': Could not determine final asset directory from metadata info '{temp_metadata_info}'. Skipping EXTRA file copying.") log.info(f"Finished organizing output for asset '{asset_name_sanitized}'.")