Asset-Frameworker/asset_processor.py
2025-04-29 18:26:13 +02:00

2514 lines
153 KiB
Python

# asset_processor.py
import os
import math
import shutil
import tempfile
import zipfile
import logging
import json
import re
import time
from pathlib import Path
from fnmatch import fnmatch # For pattern matching like *.fbx, *_Preview*
from typing import List, Dict, Tuple, Optional # Added for type hinting
from collections import defaultdict # Added for grouping
# Attempt to import archive libraries
try:
import rarfile
import py7zr
except ImportError as e:
print(f"ERROR: Missing required archive libraries: {e}")
print("Please install them using:")
print("pip install rarfile py7zr")
# Do not exit here, allow the script to run but extraction will fail for these types
rarfile = None # Set to None so checks can still be made
py7zr = None # Set to None
# Attempt to import image processing libraries
try:
import cv2
import numpy as np
except ImportError:
print("ERROR: Missing required image processing libraries. Please install opencv-python and numpy:")
print("pip install opencv-python numpy")
exit(1) # Exit if essential libraries are missing
# Attempt to import OpenEXR - Check if needed for advanced EXR flags/types
try:
import OpenEXR
import Imath
_HAS_OPENEXR = True
except ImportError:
_HAS_OPENEXR = False
# Log this information - basic EXR might still work via OpenCV
logging.debug("Optional 'OpenEXR' python package not found. EXR saving relies on OpenCV's built-in support.")
# Assuming Configuration class is in configuration.py
try:
from configuration import Configuration, ConfigurationError
except ImportError:
print("ERROR: Cannot import Configuration class from configuration.py.")
print("Ensure configuration.py is in the same directory or Python path.")
exit(1)
# Use logger defined in main.py (or configure one here if run standalone)
log = logging.getLogger(__name__)
# Basic config if logger hasn't been set up elsewhere (e.g., during testing)
if not log.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') # Reverted basicConfig level
# --- Custom Exception ---
class AssetProcessingError(Exception):
"""Custom exception for errors during asset processing."""
pass
# --- Helper Functions ---
def calculate_target_dimensions(orig_w, orig_h, target_max_dim) -> tuple[int, int]:
"""
Calculates target dimensions by first scaling to fit target_max_dim
while maintaining aspect ratio, then finding the nearest power-of-two
value for each resulting dimension (Stretch/Squash to POT).
"""
if orig_w <= 0 or orig_h <= 0:
# Fallback to target_max_dim if original dimensions are invalid
pot_dim = get_nearest_pot(target_max_dim)
log.warning(f"Invalid original dimensions ({orig_w}x{orig_h}). Falling back to nearest POT of target_max_dim: {pot_dim}x{pot_dim}")
return (pot_dim, pot_dim)
# Step 1: Calculate intermediate dimensions maintaining aspect ratio
ratio = orig_w / orig_h
if ratio > 1: # Width is dominant
scaled_w = target_max_dim
scaled_h = max(1, round(scaled_w / ratio))
else: # Height is dominant or square
scaled_h = target_max_dim
scaled_w = max(1, round(scaled_h * ratio))
# Step 2: Find the nearest power of two for each scaled dimension
pot_w = get_nearest_pot(scaled_w)
pot_h = get_nearest_pot(scaled_h)
log.debug(f"POT Calc: Orig=({orig_w}x{orig_h}), MaxDim={target_max_dim} -> Scaled=({scaled_w}x{scaled_h}) -> POT=({pot_w}x{pot_h})")
return int(pot_w), int(pot_h)
def _calculate_image_stats(image_data: np.ndarray) -> dict | None:
"""
Calculates min, max, mean for a given numpy image array.
Handles grayscale and multi-channel images. Converts to float64 for calculation.
"""
if image_data is None:
log.warning("Attempted to calculate stats on None image data.")
return None
try:
# Use float64 for calculations to avoid potential overflow/precision issues
data_float = image_data.astype(np.float64)
# Normalize data_float based on original dtype before calculating stats
if image_data.dtype == np.uint16:
log.debug("Stats calculation: Normalizing uint16 data to 0-1 range.")
data_float /= 65535.0
elif image_data.dtype == np.uint8:
log.debug("Stats calculation: Normalizing uint8 data to 0-1 range.")
data_float /= 255.0
# Assuming float inputs are already in 0-1 range or similar
log.debug(f"Stats calculation: data_float dtype: {data_float.dtype}, shape: {data_float.shape}")
# Log a few sample values to check range after normalization
if data_float.size > 0:
sample_values = data_float.flatten()[:10] # Get first 10 values
log.debug(f"Stats calculation: Sample values (first 10) after normalization: {sample_values.tolist()}")
if len(data_float.shape) == 2: # Grayscale (H, W)
min_val = float(np.min(data_float))
max_val = float(np.max(data_float))
mean_val = float(np.mean(data_float))
stats = {"min": min_val, "max": max_val, "mean": mean_val}
log.debug(f"Calculated Grayscale Stats: Min={min_val:.4f}, Max={max_val:.4f}, Mean={mean_val:.4f}")
elif len(data_float.shape) == 3: # Color (H, W, C)
channels = data_float.shape[2]
min_val = [float(v) for v in np.min(data_float, axis=(0, 1))]
max_val = [float(v) for v in np.max(data_float, axis=(0, 1))]
mean_val = [float(v) for v in np.mean(data_float, axis=(0, 1))]
# The input data_float is now expected to be in RGB order after conversion in _process_maps
stats = {"min": min_val, "max": max_val, "mean": mean_val}
log.debug(f"Calculated {channels}-Channel Stats (RGB order): Min={min_val}, Max={max_val}, Mean={mean_val}")
else:
log.warning(f"Cannot calculate stats for image with unsupported shape {data_float.shape}")
return None
return stats
except Exception as e:
log.error(f"Error calculating image stats: {e}", exc_info=True) # Log exception info
return {"error": str(e)}
# --- Helper function ---
def _get_base_map_type(target_map_string: str) -> str:
"""Extracts the base map type (e.g., 'COL') from a potentially numbered string ('COL-1')."""
match = re.match(r"([a-zA-Z]+)", target_map_string)
if match:
return match.group(1).upper()
return target_map_string.upper() # Fallback if no number suffix
def _is_power_of_two(n: int) -> bool:
"""Checks if a number is a power of two."""
return (n > 0) and (n & (n - 1) == 0)
def get_nearest_pot(value: int) -> int:
"""Finds the nearest power of two to the given value."""
if value <= 0:
return 1 # Or raise error, POT must be positive
if _is_power_of_two(value):
return value
# Calculate the powers of two below and above the value
lower_pot = 1 << (value.bit_length() - 1)
upper_pot = 1 << value.bit_length()
# Determine which power of two is closer
if (value - lower_pot) < (upper_pot - value):
return lower_pot
else:
return upper_pot
# --- Asset Processor Class ---
class AssetProcessor:
"""
Handles the processing pipeline for a single asset (ZIP or folder).
"""
# Define the list of known grayscale map types (adjust as needed)
GRAYSCALE_MAP_TYPES = ['HEIGHT', 'ROUGH', 'METAL', 'AO', 'OPC', 'MASK']
def __init__(self, input_path: Path, config: Configuration, output_base_path: Path, overwrite: bool = False):
"""
Initializes the processor for a given input asset.
Args:
input_path: Path to the input ZIP file or folder.
config: The loaded Configuration object.
output_base_path: The base directory where processed output will be saved.
overwrite: If True, forces reprocessing even if output exists.
"""
if not isinstance(input_path, Path): input_path = Path(input_path)
if not isinstance(output_base_path, Path): output_base_path = Path(output_base_path)
if not isinstance(config, Configuration): raise TypeError("config must be a Configuration object.")
if not input_path.exists():
raise AssetProcessingError(f"Input path does not exist: {input_path}")
supported_suffixes = ['.zip', '.rar', '.7z']
if not (input_path.is_dir() or (input_path.is_file() and input_path.suffix.lower() in supported_suffixes)):
raise AssetProcessingError(f"Input path must be a directory or a supported archive file (.zip, .rar, .7z): {input_path}")
self.input_path: Path = input_path
self.config: Configuration = config
self.output_base_path: Path = output_base_path
self.overwrite: bool = overwrite # Store the overwrite flag
self.temp_dir: Path | None = None # Path to the temporary working directory
self.classified_files: dict[str, list[dict]] = {
"maps": [], "models": [], "extra": [], "ignored": []
}
# These will no longer store instance-wide results, but are kept for potential future use or refactoring
# self.processed_maps_details: dict[str, dict[str, dict]] = {}
# self.merged_maps_details: dict[str, dict[str, dict]] = {}
# self.metadata_file_path_temp: Path | None = None
# self.metadata: dict = {} # Metadata is now handled per-asset within the process loop
log.debug(f"AssetProcessor initialized for: {self.input_path.name}")
# --- New Helper Function: Load and Transform Source ---
def _load_and_transform_source(self, source_path_rel: Path, map_type: str, target_resolution_key: str, is_gloss_source: bool, cache: dict) -> Tuple[Optional[np.ndarray], Optional[np.dtype]]:
"""
Loads a source image file, performs initial prep (BGR->RGB, Gloss->Rough),
resizes it to the target resolution, and caches the result.
Args:
source_path_rel: Relative path to the source file within the temp directory.
map_type: The standard map type (e.g., "NRM", "ROUGH").
target_resolution_key: The key for the target resolution (e.g., "4K").
is_gloss_source: Boolean indicating if this source should be treated as gloss for inversion.
cache: The dictionary used for caching loaded/resized data.
Returns:
Tuple containing:
- Resized NumPy array (float32) or None if loading/processing fails.
- Original source NumPy dtype or None if loading fails.
"""
if not self.temp_dir:
log.error("Temporary directory not set in _load_and_transform_source.")
return None, None
cache_key = (source_path_rel, target_resolution_key)
if cache_key in cache:
log.debug(f"CACHE HIT: Returning cached data for {source_path_rel} at {target_resolution_key}")
return cache[cache_key] # Return tuple (image_data, source_dtype)
log.debug(f"CACHE MISS: Loading and transforming {source_path_rel} for {target_resolution_key}")
full_source_path = self.temp_dir / source_path_rel
img_prepared = None
source_dtype = None
try:
# --- 1. Load Source Image ---
# Determine read flag (Grayscale for specific types, unchanged otherwise)
read_flag = cv2.IMREAD_GRAYSCALE if map_type.upper() in self.GRAYSCALE_MAP_TYPES else cv2.IMREAD_UNCHANGED
# Special case for MASK: always load unchanged first to check alpha
if map_type.upper() == 'MASK': read_flag = cv2.IMREAD_UNCHANGED
log.debug(f"Loading source {full_source_path.name} with flag: {'GRAYSCALE' if read_flag == cv2.IMREAD_GRAYSCALE else 'UNCHANGED'}")
img_loaded = cv2.imread(str(full_source_path), read_flag)
if img_loaded is None:
raise AssetProcessingError(f"Failed to load image file: {full_source_path.name} with flag {read_flag}")
source_dtype = img_loaded.dtype
log.debug(f"Loaded source {full_source_path.name}, dtype: {source_dtype}, shape: {img_loaded.shape}")
# MASK Handling (Extract alpha or convert) - Do this BEFORE general color conversions
if _get_base_map_type(map_type) == 'MASK':
log.debug(f"Processing as MASK type for {source_path_rel.name}.")
shape = img_loaded.shape # Use img_loaded
if len(shape) == 3 and shape[2] == 4:
log.debug("MASK processing: Extracting alpha channel (4-channel source).")
img_prepared = img_loaded[:, :, 3] # Extract alpha from img_loaded
elif len(shape) == 3 and shape[2] == 3:
log.debug("MASK processing: Converting BGR to Grayscale (3-channel source).") # OpenCV loads as BGR
img_prepared = cv2.cvtColor(img_loaded, cv2.COLOR_BGR2GRAY) # Convert BGR to Gray
elif len(shape) == 2:
log.debug("MASK processing: Source is already grayscale.")
img_prepared = img_loaded # Keep as is
else:
log.warning(f"MASK processing: Unexpected source shape {shape}. Cannot reliably extract mask.")
img_prepared = None # Cannot process
# MASK should ideally be uint8 for saving later, but keep float for now if inverted?
# Let _save_image handle final conversion based on format rules.
else:
# For non-MASK types, start with the loaded image
img_prepared = img_loaded
# --- 2. Initial Preparation (BGR->RGB, Gloss Inversion, MASK handling) ---
img_prepared = img_loaded # Start with loaded image
# BGR -> RGB conversion (only for 3-channel images)
if len(img_prepared.shape) == 3 and img_prepared.shape[2] >= 3: # Check for 3 or 4 channels
# Ensure it's not already grayscale before attempting conversion
if read_flag != cv2.IMREAD_GRAYSCALE:
log.debug(f"Converting loaded image from BGR to RGB for {source_path_rel.name}.")
# Handle 4-channel (BGRA) by converting to RGB first
if img_prepared.shape[2] == 4:
img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_BGRA2RGB)
else: # 3-channel (BGR)
img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_BGR2RGB)
else:
log.debug(f"Skipping BGR->RGB conversion for {source_path_rel.name} as it was loaded grayscale.")
elif len(img_prepared.shape) == 2:
log.debug(f"Image {source_path_rel.name} is grayscale, no BGR->RGB conversion needed.")
else:
log.warning(f"Unexpected image shape {img_prepared.shape} for {source_path_rel.name} after loading.")
# Gloss -> Roughness Inversion
if map_type == 'ROUGH' and is_gloss_source:
log.info(f"Performing Gloss->Roughness inversion for {source_path_rel.name}")
# Ensure grayscale before inversion
if len(img_prepared.shape) == 3:
img_prepared = cv2.cvtColor(img_prepared, cv2.COLOR_RGB2GRAY) # Use RGB2GRAY as it's already converted
# Normalize based on original source dtype before inversion
if source_dtype == np.uint16:
img_float = 1.0 - (img_prepared.astype(np.float32) / 65535.0)
elif source_dtype == np.uint8:
img_float = 1.0 - (img_prepared.astype(np.float32) / 255.0)
else: # Assuming float input is already 0-1 range
img_float = 1.0 - img_prepared.astype(np.float32)
img_prepared = np.clip(img_float, 0.0, 1.0) # Result is float32
log.debug(f"Inverted gloss map stored as float32 for ROUGH, original dtype: {source_dtype}")
# Ensure data is float32 for resizing if it came from gloss inversion
if isinstance(img_prepared, np.ndarray) and img_prepared.dtype != np.float32 and map_type == 'ROUGH' and is_gloss_source:
img_prepared = img_prepared.astype(np.float32)
elif isinstance(img_prepared, np.ndarray) and img_prepared.dtype not in [np.uint8, np.uint16, np.float32, np.float16]:
# Convert other potential types (like bool) to float32 for resizing compatibility
log.warning(f"Converting unexpected dtype {img_prepared.dtype} to float32 before resizing.")
img_prepared = img_prepared.astype(np.float32)
# --- 3. Resize ---
if img_prepared is None: raise AssetProcessingError("Image data is None after initial prep.")
orig_h, orig_w = img_prepared.shape[:2]
target_dim_px = self.config.image_resolutions.get(target_resolution_key)
if not target_dim_px:
raise AssetProcessingError(f"Target resolution key '{target_resolution_key}' not found in config.")
# Avoid upscaling check
max_original_dimension = max(orig_w, orig_h)
if target_dim_px > max_original_dimension:
log.warning(f"Target dimension {target_dim_px}px is larger than original {max_original_dimension}px for {source_path_rel}. Skipping resize for {target_resolution_key}.")
# Store None in cache for this specific resolution to avoid retrying
cache[cache_key] = (None, source_dtype)
return None, source_dtype # Indicate resize was skipped
if orig_w <= 0 or orig_h <= 0:
raise AssetProcessingError(f"Invalid original dimensions ({orig_w}x{orig_h}) for {source_path_rel}.")
target_w, target_h = calculate_target_dimensions(orig_w, orig_h, target_dim_px)
interpolation = cv2.INTER_LANCZOS4 if (target_w * target_h) < (orig_w * orig_h) else cv2.INTER_CUBIC
log.debug(f"Resizing {source_path_rel.name} from ({orig_w}x{orig_h}) to ({target_w}x{target_h}) for {target_resolution_key}")
img_resized = cv2.resize(img_prepared, (target_w, target_h), interpolation=interpolation)
# --- 4. Cache and Return ---
# Ensure result is float32 if it came from gloss inversion, otherwise keep resized dtype
final_data_to_cache = img_resized
if map_type == 'ROUGH' and is_gloss_source and final_data_to_cache.dtype != np.float32:
final_data_to_cache = final_data_to_cache.astype(np.float32)
log.debug(f"CACHING result for {cache_key}. Shape: {final_data_to_cache.shape}, Dtype: {final_data_to_cache.dtype}")
cache[cache_key] = (final_data_to_cache, source_dtype)
return final_data_to_cache, source_dtype
except Exception as e:
log.error(f"Error in _load_and_transform_source for {source_path_rel} at {target_resolution_key}: {e}", exc_info=True)
# Cache None to prevent retrying on error for this specific key
cache[cache_key] = (None, None)
return None, None
# --- New Helper Function: Save Image ---
def _save_image(self, image_data: np.ndarray, map_type: str, resolution_key: str, asset_base_name: str, source_info: dict, output_bit_depth_rule: str, temp_dir: Path) -> Optional[Dict]:
"""
Handles saving an image NumPy array to a temporary file, including determining
format, bit depth, performing final conversions, and fallback logic.
Args:
image_data: NumPy array containing the image data to save.
map_type: The standard map type being saved (e.g., "COL", "NRMRGH").
resolution_key: The resolution key (e.g., "4K").
asset_base_name: The sanitized base name of the asset.
source_info: Dictionary containing details about the source(s), e.g.,
{'original_extension': '.tif', 'source_bit_depth': 16, 'involved_extensions': {'.tif', '.png'}}
output_bit_depth_rule: Rule for determining output bit depth ('respect', 'force_8bit', 'force_16bit', 'respect_inputs').
temp_dir: The temporary directory path to save the file in.
Returns:
A dictionary containing details of the saved file (path, width, height,
bit_depth, format) or None if saving failed.
"""
if image_data is None:
log.error(f"Cannot save image for {map_type} ({resolution_key}): image_data is None.")
return None
if not temp_dir or not temp_dir.exists():
log.error(f"Cannot save image for {map_type} ({resolution_key}): temp_dir is invalid.")
return None
try:
h, w = image_data.shape[:2]
current_dtype = image_data.dtype
log.debug(f"Saving {map_type} ({resolution_key}) for asset '{asset_base_name}'. Input shape: {image_data.shape}, dtype: {current_dtype}")
# --- 1. Determine Output Bit Depth ---
source_bpc = source_info.get('source_bit_depth', 8) # Default to 8 if missing
max_input_bpc = source_info.get('max_input_bit_depth', source_bpc) # For 'respect_inputs' merge rule
output_dtype_target, output_bit_depth = np.uint8, 8 # Default
if output_bit_depth_rule == 'force_8bit':
output_dtype_target, output_bit_depth = np.uint8, 8
elif output_bit_depth_rule == 'force_16bit':
output_dtype_target, output_bit_depth = np.uint16, 16
elif output_bit_depth_rule == 'respect': # For individual maps
if source_bpc == 16: output_dtype_target, output_bit_depth = np.uint16, 16
# Handle float source? Assume 16-bit output if source was float? Needs clarification.
# For now, stick to uint8/16 based on source_bpc.
elif output_bit_depth_rule == 'respect_inputs': # For merged maps
if max_input_bpc == 16: output_dtype_target, output_bit_depth = np.uint16, 16
else: # Default to 8-bit if rule is unknown
log.warning(f"Unknown output_bit_depth_rule '{output_bit_depth_rule}'. Defaulting to 8-bit.")
output_dtype_target, output_bit_depth = np.uint8, 8
log.debug(f"Target output bit depth: {output_bit_depth}-bit (dtype: {output_dtype_target.__name__}) based on rule '{output_bit_depth_rule}'")
# --- 2. Determine Output Format ---
output_format, output_ext, save_params, needs_float16 = "", "", [], False
primary_fmt_16, fallback_fmt_16 = self.config.get_16bit_output_formats()
fmt_8bit_config = self.config.get_8bit_output_format()
threshold = self.config.resolution_threshold_for_jpg
force_lossless = map_type in self.config.force_lossless_map_types
original_extension = source_info.get('original_extension', '.png') # Primary source ext
involved_extensions = source_info.get('involved_extensions', {original_extension}) # For merges
target_dim_px = self.config.image_resolutions.get(resolution_key, 0) # Get target dimension size
# Apply format determination logic (similar to old _process_maps/_merge_maps)
if force_lossless:
log.debug(f"Format forced to lossless for map type '{map_type}'.")
if output_bit_depth == 16:
output_format = primary_fmt_16
if output_format.startswith("exr"):
output_ext, needs_float16 = ".exr", True
save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF])
else: # Assume PNG if primary 16-bit isn't EXR
if output_format != "png": log.warning(f"Primary 16-bit format '{output_format}' not PNG/EXR for forced lossless. Using fallback '{fallback_fmt_16}'.")
output_format = fallback_fmt_16 if fallback_fmt_16 == "png" else "png" # Ensure PNG
output_ext = ".png"
png_level = self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)
save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, png_level])
else: # 8-bit lossless -> PNG
output_format = "png"; output_ext = ".png"
png_level = self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)
save_params = [cv2.IMWRITE_PNG_COMPRESSION, png_level]
elif output_bit_depth == 8 and target_dim_px >= threshold:
output_format = 'jpg'; output_ext = '.jpg'
jpg_quality = self.config.jpg_quality
save_params.extend([cv2.IMWRITE_JPEG_QUALITY, jpg_quality])
log.debug(f"Using JPG format (Quality: {jpg_quality}) for {map_type} at {resolution_key} due to resolution threshold ({target_dim_px} >= {threshold}).")
else:
# Determine highest format involved (for merges) or use original (for individuals)
highest_format_str = 'jpg' # Default lowest
relevant_extensions = involved_extensions if map_type in self.config.map_merge_rules else {original_extension}
if '.exr' in relevant_extensions: highest_format_str = 'exr'
elif '.tif' in relevant_extensions: highest_format_str = 'tif'
elif '.png' in relevant_extensions: highest_format_str = 'png'
if highest_format_str == 'exr':
if output_bit_depth == 16: output_format, output_ext, needs_float16 = "exr", ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF])
else: output_format, output_ext = "png", ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)])
elif highest_format_str == 'tif':
if output_bit_depth == 16:
output_format = primary_fmt_16
if output_format.startswith("exr"): output_ext, needs_float16 = ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF])
else: output_format = "png"; output_ext = ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)])
else: output_format, output_ext = "png", ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)])
elif highest_format_str == 'png':
if output_bit_depth == 16:
output_format = primary_fmt_16
if output_format.startswith("exr"): output_ext, needs_float16 = ".exr", True; save_params.extend([cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_HALF])
else: output_format = "png"; output_ext = ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)])
else: output_format, output_ext = "png", ".png"; save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)])
else: # Default to configured 8-bit format if highest was JPG or unknown
output_format = fmt_8bit_config; output_ext = f".{output_format}"
if output_format == "png": save_params.extend([cv2.IMWRITE_PNG_COMPRESSION, self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)])
elif output_format == "jpg": save_params.extend([cv2.IMWRITE_JPEG_QUALITY, self.config.jpg_quality])
# Final check: JPG must be 8-bit
if output_format == "jpg" and output_bit_depth == 16:
log.warning(f"Output format is JPG, but target bit depth is 16. Forcing 8-bit for {map_type} ({resolution_key}).")
output_dtype_target, output_bit_depth = np.uint8, 8
log.debug(f"Determined save format: {output_format}, ext: {output_ext}, bit_depth: {output_bit_depth}, needs_float16: {needs_float16}")
# --- 3. Final Data Type Conversion ---
img_to_save = image_data.copy() # Work on a copy
if output_dtype_target == np.uint8 and img_to_save.dtype != np.uint8:
log.debug(f"Converting image data from {img_to_save.dtype} to uint8 for saving.")
if img_to_save.dtype == np.uint16: img_to_save = (img_to_save.astype(np.float32) / 65535.0 * 255.0).astype(np.uint8)
elif img_to_save.dtype in [np.float16, np.float32]: img_to_save = (np.clip(img_to_save, 0.0, 1.0) * 255.0).astype(np.uint8)
else: img_to_save = img_to_save.astype(np.uint8) # Direct cast for other types (e.g., bool)
elif output_dtype_target == np.uint16 and img_to_save.dtype != np.uint16:
log.debug(f"Converting image data from {img_to_save.dtype} to uint16 for saving.")
if img_to_save.dtype == np.uint8: img_to_save = img_to_save.astype(np.uint16) * 257 # Proper 8->16 bit scaling
elif img_to_save.dtype in [np.float16, np.float32]: img_to_save = (np.clip(img_to_save, 0.0, 1.0) * 65535.0).astype(np.uint16)
else: img_to_save = img_to_save.astype(np.uint16)
if needs_float16 and img_to_save.dtype != np.float16:
log.debug(f"Converting image data from {img_to_save.dtype} to float16 for EXR saving.")
if img_to_save.dtype == np.uint16: img_to_save = (img_to_save.astype(np.float32) / 65535.0).astype(np.float16)
elif img_to_save.dtype == np.uint8: img_to_save = (img_to_save.astype(np.float32) / 255.0).astype(np.float16)
elif img_to_save.dtype == np.float32: img_to_save = img_to_save.astype(np.float16)
else: log.warning(f"Cannot convert {img_to_save.dtype} to float16 for EXR save."); return None
# --- 4. Final Color Space Conversion (RGB -> BGR for non-EXR) ---
img_save_final = img_to_save
is_3_channel = len(img_to_save.shape) == 3 and img_to_save.shape[2] == 3
if is_3_channel and not output_format.startswith("exr"):
log.debug(f"Converting RGB to BGR for saving {map_type} ({resolution_key}) as {output_format}")
try:
img_save_final = cv2.cvtColor(img_to_save, cv2.COLOR_RGB2BGR)
except Exception as cvt_err:
log.error(f"Failed RGB->BGR conversion before save for {map_type} ({resolution_key}): {cvt_err}. Saving original RGB.")
img_save_final = img_to_save # Fallback
# --- 5. Construct Filename & Save ---
filename = self.config.target_filename_pattern.format(
base_name=asset_base_name,
map_type=map_type,
resolution=resolution_key,
ext=output_ext.lstrip('.')
)
output_path_temp = temp_dir / filename
log.debug(f"Attempting to save: {output_path_temp.name} (Format: {output_format}, Dtype: {img_save_final.dtype})")
saved_successfully = False
actual_format_saved = output_format
try:
cv2.imwrite(str(output_path_temp), img_save_final, save_params)
saved_successfully = True
log.info(f" > Saved {map_type} ({resolution_key}, {output_bit_depth}-bit) as {output_format}")
except Exception as save_err:
log.error(f"Save failed ({output_format}) for {map_type} {resolution_key}: {save_err}")
# --- Try Fallback ---
if output_bit_depth == 16 and output_format.startswith("exr") and fallback_fmt_16 != output_format and fallback_fmt_16 == "png":
log.warning(f"Attempting fallback PNG save for {map_type} {resolution_key}")
actual_format_saved = "png"; output_ext = ".png";
filename = self.config.target_filename_pattern.format(base_name=asset_base_name, map_type=map_type, resolution=resolution_key, ext="png")
output_path_temp = temp_dir / filename
save_params_fallback = [cv2.IMWRITE_PNG_COMPRESSION, self.config._core_settings.get('PNG_COMPRESSION_LEVEL', 6)]
img_fallback = None; target_fallback_dtype = np.uint16
# Convert original data (before float16 conversion) to uint16 for PNG fallback
if img_to_save.dtype == np.float16: # This means original was likely float or uint16/8 converted to float16
# Need to get back to uint16 - use the pre-float16 converted data if possible?
# Safest is to convert the float16 back to uint16
img_scaled = np.clip(img_to_save.astype(np.float32) * 65535.0, 0, 65535)
img_fallback = img_scaled.astype(target_fallback_dtype)
elif img_to_save.dtype == target_fallback_dtype: img_fallback = img_to_save # Already uint16
else: log.error(f"Cannot convert {img_to_save.dtype} for PNG fallback."); return None
# --- Conditional RGB -> BGR Conversion for fallback ---
img_fallback_save_final = img_fallback
is_3_channel_fallback = len(img_fallback.shape) == 3 and img_fallback.shape[2] == 3
if is_3_channel_fallback: # PNG is non-EXR
log.debug(f"Converting RGB to BGR for fallback PNG save {map_type} ({resolution_key})")
try: img_fallback_save_final = cv2.cvtColor(img_fallback, cv2.COLOR_RGB2BGR)
except Exception as cvt_err_fb: log.error(f"Failed RGB->BGR conversion for fallback PNG: {cvt_err_fb}. Saving original.")
try:
cv2.imwrite(str(output_path_temp), img_fallback_save_final, save_params_fallback)
saved_successfully = True
log.info(f" > Saved {map_type} ({resolution_key}) using fallback PNG")
except Exception as fallback_err:
log.error(f"Fallback PNG save failed for {map_type} {resolution_key}: {fallback_err}", exc_info=True)
else:
log.error(f"No suitable fallback available or applicable for failed save of {map_type} ({resolution_key}) as {output_format}.")
# --- 6. Return Result ---
if saved_successfully:
return {
"path": output_path_temp.relative_to(self.temp_dir), # Store relative path
"resolution": resolution_key,
"width": w, "height": h,
"bit_depth": output_bit_depth,
"format": actual_format_saved
}
else:
return None # Indicate save failure
except Exception as e:
log.error(f"Unexpected error in _save_image for {map_type} ({resolution_key}): {e}", exc_info=True)
return None
def process(self) -> Dict[str, List[str]]:
"""
Executes the full processing pipeline for the input path, handling
multiple assets within a single input if detected.
Returns:
Dict[str, List[str]]: A dictionary summarizing the status of each
detected asset within the input:
{"processed": [asset_name1, ...],
"skipped": [asset_name2, ...],
"failed": [asset_name3, ...]}
"""
log.info(f"Starting processing for input: {self.input_path.name}")
overall_status = {"processed": [], "skipped": [], "failed": []}
supplier_name = self.config.supplier_name # Get once
loaded_data_cache = {} # Initialize cache for this process call
try:
self._setup_workspace()
self._extract_input()
self._inventory_and_classify_files() # Classifies all files in self.classified_files
# Determine distinct assets and file mapping
distinct_base_names, file_to_base_name_map = self._determine_base_metadata()
unmatched_files_paths = [p for p, name in file_to_base_name_map.items() if name is None]
if unmatched_files_paths:
log.warning(f"Found {len(unmatched_files_paths)} files not matched to any specific asset base name. They will be copied to each asset's Extra folder.")
log.debug(f"Unmatched files: {[str(p) for p in unmatched_files_paths]}")
# --- Loop through each detected asset ---
for current_asset_name in distinct_base_names:
log.info(f"--- Processing detected asset: '{current_asset_name}' ---")
asset_processed = False
asset_skipped = False
asset_failed = False
temp_metadata_path_asset = None # Track metadata file for this asset
map_details_asset = {} # Store map details for this asset
try:
# --- Filter classified files for the current asset ---
filtered_classified_files_asset = defaultdict(list)
for category, file_list in self.classified_files.items():
for file_info in file_list:
file_path = file_info.get('source_path')
if file_path and file_to_base_name_map.get(file_path) == current_asset_name:
filtered_classified_files_asset[category].append(file_info)
log.debug(f"Asset '{current_asset_name}': Filtered files - Maps: {len(filtered_classified_files_asset.get('maps',[]))}, Models: {len(filtered_classified_files_asset.get('models',[]))}, Extra: {len(filtered_classified_files_asset.get('extra',[]))}, Ignored: {len(filtered_classified_files_asset.get('ignored',[]))}")
# --- Assign Suffixes Per-Asset ---
log.debug(f"Asset '{current_asset_name}': Assigning map type suffixes...")
asset_maps = filtered_classified_files_asset.get('maps', [])
grouped_asset_maps = defaultdict(list)
for map_info in asset_maps:
# Group by the base map type stored earlier
grouped_asset_maps[map_info['map_type']].append(map_info)
for base_map_type, maps_in_group in grouped_asset_maps.items():
log.debug(f" Assigning suffixes for base type '{base_map_type}' within asset '{current_asset_name}' ({len(maps_in_group)} maps)")
# Sorting is already done by _inventory_and_classify_files, just need to assign suffix
respect_variants = base_map_type in self.config.respect_variant_map_types
for i, map_info in enumerate(maps_in_group):
if respect_variants:
final_map_type = f"{base_map_type}-{i + 1}"
else:
final_map_type = base_map_type
log.debug(f" Updating '{map_info['source_path']}' map_type from '{map_info['map_type']}' to '{final_map_type}'")
map_info['map_type'] = final_map_type # Update the map_type in the dictionary
# --- Determine Metadata for this specific asset ---
asset_specific_metadata = self._determine_single_asset_metadata(current_asset_name, filtered_classified_files_asset)
current_asset_metadata = {
"asset_name": current_asset_name,
"supplier_name": supplier_name,
"asset_category": asset_specific_metadata.get("asset_category", self.config.default_asset_category),
"archetype": asset_specific_metadata.get("archetype", "Unknown"),
# Initialize fields that will be populated by processing steps
"maps_present": [],
"merged_maps": [],
"shader_features": [],
"source_files_in_extra": [], # Will be populated in _generate_metadata
"image_stats_1k": {},
"map_details": {}, # Will be populated by _process_maps
"aspect_ratio_change_string": "N/A"
}
# --- Skip Check for this specific asset ---
if not self.overwrite:
supplier_sanitized = self._sanitize_filename(supplier_name)
asset_name_sanitized = self._sanitize_filename(current_asset_name)
final_dir = self.output_base_path / supplier_sanitized / asset_name_sanitized
metadata_file_path = final_dir / self.config.metadata_filename
if final_dir.exists() and metadata_file_path.is_file():
log.info(f"Output directory and metadata found for asset '{asset_name_sanitized}' and overwrite is False. Skipping this asset.")
overall_status["skipped"].append(current_asset_name)
asset_skipped = True
continue # Skip to the next asset in the loop
elif self.overwrite:
log.info(f"Overwrite flag is set. Processing asset '{current_asset_name}' even if output exists.")
# --- Process Individual Maps for this asset ---
processed_maps_details_asset, image_stats_asset, aspect_ratio_change_string_asset, ignored_rough_maps = self._process_individual_maps(
filtered_maps_list=filtered_classified_files_asset.get('maps', []),
current_asset_metadata=current_asset_metadata, # Pass base metadata
loaded_data_cache=loaded_data_cache # Pass cache
)
# Update current metadata with results
current_asset_metadata["image_stats_1k"] = image_stats_asset
current_asset_metadata["aspect_ratio_change_string"] = aspect_ratio_change_string_asset
# Add newly ignored rough maps to the asset's specific ignored list
if ignored_rough_maps:
filtered_classified_files_asset['ignored'].extend(ignored_rough_maps)
# Store map details (like source bit depth) collected during processing
# This was previously stored in self.metadata["map_details"]
map_details_asset = {k: v for k, v in current_asset_metadata.pop("map_details", {}).items() if k in processed_maps_details_asset}
# --- Merge Maps from Source for this asset ---
merged_maps_details_asset = self._merge_maps_from_source(
processed_maps_details_asset=processed_maps_details_asset, # Still needed for source info lookup? Or pass classified files? Check impl.
filtered_classified_files=filtered_classified_files_asset,
current_asset_metadata=current_asset_metadata,
loaded_data_cache=loaded_data_cache # Pass cache
)
# --- Generate Metadata for this asset ---
temp_metadata_path_asset = self._generate_metadata_file(
current_asset_metadata=current_asset_metadata, # Pass the populated dict
processed_maps_details_asset=processed_maps_details_asset,
merged_maps_details_asset=merged_maps_details_asset,
filtered_classified_files_asset=filtered_classified_files_asset,
unmatched_files_paths=unmatched_files_paths, # Pass the list of unmatched files
map_details_asset=map_details_asset # Pass the filtered map details
)
# --- Organize Output Files for this asset ---
self._organize_output_files(
current_asset_name=current_asset_name,
processed_maps_details_asset=processed_maps_details_asset,
merged_maps_details_asset=merged_maps_details_asset,
filtered_classified_files_asset=filtered_classified_files_asset,
unmatched_files_paths=unmatched_files_paths, # Pass unmatched files for copying
temp_metadata_path=temp_metadata_path_asset
)
log.info(f"--- Asset '{current_asset_name}' processed successfully. ---")
overall_status["processed"].append(current_asset_name)
asset_processed = True
except Exception as asset_err:
log.error(f"--- Failed processing asset '{current_asset_name}': {asset_err} ---", exc_info=True)
overall_status["failed"].append(current_asset_name)
asset_failed = True
# Continue to the next asset even if one fails
# --- Determine Final Consolidated Status ---
# This logic remains the same, interpreting the overall_status dict
final_status = "failed" # Default if nothing else matches
if overall_status["processed"] and not overall_status["failed"]:
final_status = "processed"
elif overall_status["skipped"] and not overall_status["processed"] and not overall_status["failed"]:
final_status = "skipped"
elif overall_status["processed"] and overall_status["failed"]:
final_status = "partial_success" # Indicate some succeeded, some failed
elif overall_status["processed"] and overall_status["skipped"] and not overall_status["failed"]:
final_status = "processed" # Consider processed+skipped as processed overall
elif overall_status["skipped"] and overall_status["failed"] and not overall_status["processed"]:
final_status = "failed" # If only skips and fails, report as failed
# Add any other combinations if needed
log.info(f"Finished processing input '{self.input_path.name}'. Overall Status: {final_status}. Summary: {overall_status}")
# Return the detailed status dictionary instead of just a string
# The wrapper function in main.py will interpret this
return overall_status
except Exception as e:
# Catch errors during initial setup (before asset loop)
if not isinstance(e, (AssetProcessingError, ConfigurationError)):
log.exception(f"Asset processing failed unexpectedly for {self.input_path.name} during setup: {e}")
if not isinstance(e, AssetProcessingError):
raise AssetProcessingError(f"Failed processing {self.input_path.name}: {e}") from e
else:
raise
finally:
# Ensure cleanup always happens
self._cleanup_workspace()
def _setup_workspace(self):
"""Creates a temporary directory for processing."""
try:
self.temp_dir = Path(tempfile.mkdtemp(prefix=self.config.temp_dir_prefix))
log.debug(f"Created temporary workspace: {self.temp_dir}")
except Exception as e:
raise AssetProcessingError(f"Failed to create temporary workspace: {e}") from e
def _extract_input(self):
"""Extracts ZIP or copies folder contents to the temporary workspace."""
if not self.temp_dir:
raise AssetProcessingError("Temporary workspace not setup before extraction.")
log.info(f"Preparing source files from {self.input_path.name}...")
try:
if self.input_path.is_file():
suffix = self.input_path.suffix.lower()
if suffix == '.zip':
log.debug(f"Extracting ZIP file: {self.input_path}")
with zipfile.ZipFile(self.input_path, 'r') as zip_ref:
zip_ref.extractall(self.temp_dir)
log.info(f"ZIP extracted to {self.temp_dir}")
elif suffix == '.rar':
log.debug(f"Extracting RAR file: {self.input_path}")
# rarfile requires unrar to be installed and in the system's PATH
# We assume this is handled by the user's environment setup.
# Basic error handling for common rarfile exceptions.
try:
with rarfile.RarFile(self.input_path, 'r') as rar_ref:
rar_ref.extractall(self.temp_dir)
log.info(f"RAR extracted to {self.temp_dir}")
except rarfile.BadRarFile:
raise AssetProcessingError(f"Input file is not a valid RAR archive: {self.input_path.name}")
except rarfile.NeedFirstVolume:
raise AssetProcessingError(f"RAR archive is part of a multi-volume set, but the first volume is missing: {self.input_path.name}")
except rarfile.PasswordRequired:
# As per plan, we don't handle passwords at this stage
raise AssetProcessingError(f"RAR archive is password protected. Skipping: {self.input_path.name}")
except rarfile.NoRarEntry:
raise AssetProcessingError(f"RAR archive is empty or corrupted: {self.input_path.name}")
except Exception as rar_err:
# Catch any other unexpected rarfile errors
raise AssetProcessingError(f"Failed to extract RAR archive {self.input_path.name}: {rar_err}") from rar_err
elif suffix == '.7z':
log.debug(f"Extracting 7z file: {self.input_path}")
# py7zr handles extraction directly
try:
with py7zr.SevenZipFile(self.input_path, mode='r') as sz_ref:
sz_ref.extractall(path=self.temp_dir)
log.info(f"7z extracted to {self.temp_dir}")
except py7zr.Bad7zFile:
raise AssetProcessingError(f"Input file is not a valid 7z archive: {self.input_path.name}")
except py7zr.PasswordRequired:
# As per plan, we don't handle passwords at this stage
raise AssetProcessingError(f"7z archive is password protected. Skipping: {self.input_path.name}")
except Exception as sz_err:
# Catch any other unexpected py7zr errors
raise AssetProcessingError(f"Failed to extract 7z archive {self.input_path.name}: {sz_err}") from sz_err
else:
# If it's a file but not zip, rar, or 7z, treat it as an error for now
# Or could add logic to copy single files? Plan says zip or folder.
raise AssetProcessingError(f"Input file is not a supported archive type (.zip, .rar, .7z): {self.input_path.name}")
elif self.input_path.is_dir():
log.debug(f"Copying directory contents: {self.input_path}")
for item in self.input_path.iterdir():
destination = self.temp_dir / item.name
if item.is_dir():
# Use dirs_exist_ok=True for robustness if Python version supports it (3.8+)
try:
shutil.copytree(item, destination, dirs_exist_ok=True)
except TypeError: # Fallback for older Python
if not destination.exists():
shutil.copytree(item, destination)
else:
log.warning(f"Subdirectory '{item.name}' already exists in temp dir, skipping copytree (potential issue on older Python).")
else:
shutil.copy2(item, destination)
log.info(f"Directory contents copied to {self.temp_dir}")
else:
# This case should be caught by __init__ but included for robustness
raise AssetProcessingError(f"Input path must be a directory or a supported archive file (.zip, .rar, .7z): {self.input_path}")
except AssetProcessingError:
# Re-raise our custom exception directly
raise
except Exception as e:
# Wrap any other unexpected exceptions
raise AssetProcessingError(f"An unexpected error occurred during input extraction for {self.input_path.name}: {e}") from e
def _inventory_and_classify_files(self):
"""
Scans workspace, classifies files according to preset rules, handling
16-bit prioritization and multiple variants of the same base map type.
"""
if not self.temp_dir:
raise AssetProcessingError("Temporary workspace not setup before inventory.")
log.info("Scanning and classifying files...")
log.debug("--- Starting File Inventory and Classification (v2) ---")
all_files_rel = []
for root, _, files in os.walk(self.temp_dir):
root_path = Path(root)
for file in files:
full_path = root_path / file
relative_path = full_path.relative_to(self.temp_dir)
all_files_rel.append(relative_path)
log.debug(f"Found {len(all_files_rel)} files in workspace: {[str(p) for p in all_files_rel]}")
# --- Initialization ---
processed_files = set() # Track relative paths handled (Extra, Models, Ignored, Final Maps)
potential_map_candidates = [] # List to store potential map file info
# Reset classified files (important if this method is ever called multiple times)
self.classified_files = {"maps": [], "models": [], "extra": [], "ignored": []}
# --- Step 1: Identify Explicit 'Extra' Files ---
log.debug("Step 1: Checking for files to move to 'Extra' (using regex)...")
compiled_extra_regex = getattr(self.config, 'compiled_extra_regex', [])
log.debug(f" Compiled 'Extra' regex patterns: {[r.pattern for r in compiled_extra_regex]}")
for file_rel_path in all_files_rel:
if file_rel_path in processed_files: continue
for compiled_regex in compiled_extra_regex:
if compiled_regex.search(file_rel_path.name):
log.debug(f" REGEX MATCH FOUND: Marking '{file_rel_path}' for 'Extra' folder based on pattern '{compiled_regex.pattern}'.")
self.classified_files["extra"].append({'source_path': file_rel_path, 'reason': f'Regex match: {compiled_regex.pattern}'})
processed_files.add(file_rel_path)
log.debug(f" Added '{file_rel_path}' to processed files.")
break # Stop checking extra patterns for this file
# --- Step 2: Identify Model Files ---
log.debug("Step 2: Identifying model files (using regex)...")
compiled_model_regex = getattr(self.config, 'compiled_model_regex', [])
log.debug(f" Compiled 'Model' regex patterns: {[r.pattern for r in compiled_model_regex]}")
for file_rel_path in all_files_rel:
if file_rel_path in processed_files: continue
for compiled_regex in compiled_model_regex:
if compiled_regex.search(file_rel_path.name):
log.debug(f" REGEX MATCH FOUND: Identified '{file_rel_path}' as model file based on pattern '{compiled_regex.pattern}'.")
self.classified_files["models"].append({'source_path': file_rel_path})
processed_files.add(file_rel_path)
log.debug(f" Added '{file_rel_path}' to processed files.")
break # Stop checking model patterns for this file
# --- Step 3: Gather Potential Map Candidates (Refactored) ---
log.debug("Step 3: Gathering potential map candidates (iterating files first)...")
# Compiled map keyword regex now maps: base_type -> [(regex, keyword, rule_index), ...]
compiled_map_keyword_regex_tuples = getattr(self.config, 'compiled_map_keyword_regex', {})
for file_rel_path in all_files_rel:
# Skip files already classified as Extra or Model
if file_rel_path in processed_files:
continue
file_stem = file_rel_path.stem
match_found = False
# Iterate through base types and their associated regex tuples
for base_map_type, regex_tuples in compiled_map_keyword_regex_tuples.items():
if match_found: break # Stop checking types for this file once matched
# Get the original keywords list for the current rule index
# Assuming self.config.map_type_mapping holds the original list of dicts from JSON
original_rule = None
# Find the rule based on the first tuple's rule_index (they should all be the same for this base_map_type)
if regex_tuples:
current_rule_index = regex_tuples[0][2] # Get rule_index from the first tuple
if hasattr(self.config, 'map_type_mapping') and current_rule_index < len(self.config.map_type_mapping):
rule_candidate = self.config.map_type_mapping[current_rule_index]
# Verify it's the correct rule by checking target_type
if rule_candidate.get("target_type") == base_map_type:
original_rule = rule_candidate
else:
log.warning(f"Rule index mismatch for {base_map_type} at index {current_rule_index}. Searching...")
# Fallback search if index doesn't match (shouldn't happen ideally)
for idx, rule in enumerate(self.config.map_type_mapping):
if rule.get("target_type") == base_map_type:
original_rule = rule
log.warning(f"Found rule for {base_map_type} at index {idx} instead.")
break
original_keywords_list = []
if original_rule and 'keywords' in original_rule:
original_keywords_list = original_rule['keywords']
else:
log.warning(f"Could not find original keywords list for rule matching base_map_type '{base_map_type}'. Keyword indexing may fail.")
for kw_regex, original_keyword, rule_index in regex_tuples:
if kw_regex.search(file_stem):
log.debug(f" Match found: '{file_rel_path}' matches keyword '{original_keyword}' (rule {rule_index}, pattern: '{kw_regex.pattern}') for base type '{base_map_type}'")
# Find the index of the matched keyword within its rule's list
keyword_index_in_rule = -1 # Default if not found
if original_keywords_list:
try:
# Use the original_keyword string directly
keyword_index_in_rule = original_keywords_list.index(original_keyword)
except ValueError:
log.warning(f"Keyword '{original_keyword}' not found in its original rule list? {original_keywords_list}")
else:
log.warning(f"Original keywords list empty for rule {rule_index}, cannot find index for '{original_keyword}'.")
# Add candidate only if not already added
if not any(c['source_path'] == file_rel_path for c in potential_map_candidates):
potential_map_candidates.append({
'source_path': file_rel_path,
'matched_keyword': original_keyword,
'base_map_type': base_map_type,
'preset_rule_index': rule_index,
'keyword_index_in_rule': keyword_index_in_rule, # <<< STORE THE KEYWORD INDEX
'is_16bit_source': False
})
else:
log.warning(f" '{file_rel_path}' was already added as a candidate? Skipping duplicate add.")
match_found = True
break # Stop checking regex tuples for this base_type once matched
log.debug(f"Gathered {len(potential_map_candidates)} potential map candidates based on keywords.")
# --- Step 3.5: Identify Standalone 16-bit Variants (Not caught by keywords) ---
log.debug("Step 3.5: Checking for standalone 16-bit variants...")
compiled_bit_depth_regex = getattr(self.config, 'compiled_bit_depth_regex_map', {})
for file_rel_path in all_files_rel:
# Skip if already processed or already identified as a candidate
if file_rel_path in processed_files or any(c['source_path'] == file_rel_path for c in potential_map_candidates):
continue
for base_type, compiled_regex in compiled_bit_depth_regex.items():
log.debug(f" Step 3.5: Checking file '{file_rel_path.name}' against 16-bit pattern for '{base_type}': {compiled_regex.pattern}") # ADDED LOG
match = compiled_regex.search(file_rel_path.name) # Store result
if match:
log.debug(f" --> MATCH FOUND for standalone 16-bit variant: '{file_rel_path}' for base type '{base_type}'") # MODIFIED LOG
potential_map_candidates.append({
'source_path': file_rel_path,
'matched_keyword': 'N/A (16bit variant)', # Placeholder keyword
'base_map_type': base_type,
'preset_rule_index': 9999, # Assign high index to avoid interfering with keyword priority
'is_16bit_source': True # Mark as 16-bit immediately
})
log.debug(f" Added candidate: {potential_map_candidates[-1]}")
# Don't add to processed_files yet, let Step 4 handle filtering
break # Stop checking bit depth patterns for this file
log.debug(f"Total potential map candidates after checking standalone 16-bit: {len(potential_map_candidates)}")
# --- Step 4: Prioritize 16-bit Variants & Filter Candidates ---
log.debug("Step 4: Prioritizing 16-bit variants and filtering candidates...")
compiled_bit_depth_regex = getattr(self.config, 'compiled_bit_depth_regex_map', {})
candidates_to_keep = []
candidates_to_ignore = [] # Store 8-bit versions superseded by 16-bit
# Mark 16-bit candidates
for candidate in potential_map_candidates:
base_type = candidate['base_map_type']
# Check if the base type exists in the bit depth map AND the filename matches the regex
if base_type in compiled_bit_depth_regex:
if compiled_bit_depth_regex[base_type].search(candidate['source_path'].name):
candidate['is_16bit_source'] = True
log.debug(f" Marked '{candidate['source_path']}' as 16-bit source for base type '{base_type}'.")
# Identify base types that have a 16-bit version present
prioritized_16bit_bases = {
candidate['base_map_type'] for candidate in potential_map_candidates if candidate['is_16bit_source']
}
log.debug(f" Base map types with 16-bit variants found: {prioritized_16bit_bases}")
# Filter: Keep 16-bit versions, or 8-bit versions if no 16-bit exists for that base type
for candidate in potential_map_candidates:
if candidate['is_16bit_source']:
candidates_to_keep.append(candidate)
log.debug(f" Keeping 16-bit candidate: {candidate['source_path']} ({candidate['base_map_type']})")
elif candidate['base_map_type'] not in prioritized_16bit_bases:
candidates_to_keep.append(candidate)
log.debug(f" Keeping 8-bit candidate (no 16-bit found): {candidate['source_path']} ({candidate['base_map_type']})")
else:
# This is an 8-bit candidate whose 16-bit counterpart exists
candidates_to_ignore.append(candidate)
log.debug(f" Ignoring 8-bit candidate (16-bit found): {candidate['source_path']} ({candidate['base_map_type']})")
# Add ignored 8-bit files to the main ignored list
for ignored_candidate in candidates_to_ignore:
self.classified_files["ignored"].append({
'source_path': ignored_candidate['source_path'],
'reason': f'Superseded by 16bit variant for {ignored_candidate["base_map_type"]}'
})
processed_files.add(ignored_candidate['source_path']) # Mark as processed
log.debug(f"Filtered candidates. Keeping: {len(candidates_to_keep)}, Ignored: {len(candidates_to_ignore)}")
# --- Step 5: Group, Sort, Assign Suffixes, and Finalize Maps ---
log.debug("Step 5: Grouping, sorting, assigning suffixes, and finalizing maps...")
# from collections import defaultdict # Moved import to top of file
grouped_by_base_type = defaultdict(list)
for candidate in candidates_to_keep:
grouped_by_base_type[candidate['base_map_type']].append(candidate)
final_map_list = []
for base_map_type, candidates in grouped_by_base_type.items():
# --- DIAGNOSTIC LOGGING START ---
candidate_paths_str = [str(c['source_path']) for c in candidates]
log.debug(f" [DIAGNOSIS] Processing base_map_type: '{base_map_type}'. Candidates before sort: {candidate_paths_str}")
# --- DIAGNOSTIC LOGGING END ---
log.debug(f" Processing final candidates for base type: '{base_map_type}' ({len(candidates)} candidates)")
# --- NEW SORTING LOGIC ---
# Sort candidates based on:
# 1. The index of the rule object in the preset's map_type_mapping list.
# 2. The index of the matched keyword within that rule object's 'keywords' list.
# 3. Alphabetical order of the source file path as a tie-breaker.
candidates.sort(key=lambda c: (
c.get('preset_rule_index', 9999), # Use get with fallback for safety
c.get('keyword_index_in_rule', 9999), # Use get with fallback for safety
str(c['source_path'])
))
# --- END NEW SORTING LOGIC ---
# Removed diagnostic log
# Add sorted candidates to the final list, but without assigning the suffix yet.
# Suffix assignment will happen per-asset later.
for final_candidate in candidates: # Use the directly sorted list
# Store the base map type for now.
final_map_list.append({
"map_type": base_map_type, # Store BASE type only
"source_path": final_candidate["source_path"],
"source_keyword": final_candidate["matched_keyword"],
"is_16bit_source": final_candidate["is_16bit_source"],
"original_extension": final_candidate["source_path"].suffix.lower() # Store original extension
})
processed_files.add(final_candidate["source_path"]) # Mark final map source as processed
self.classified_files["maps"] = final_map_list
# --- Step 6: Classify Remaining Files as 'Unrecognised' (in 'Extra') ---
log.debug("Step 6: Classifying remaining files as 'Unrecognised'...")
remaining_count = 0
for file_rel_path in all_files_rel:
if file_rel_path not in processed_files:
log.debug(f" Marking remaining file '{file_rel_path}' for 'Extra' folder (Unrecognised).")
self.classified_files["extra"].append({'source_path': file_rel_path, 'reason': 'Unrecognised'})
remaining_count += 1
# No need to add to processed_files here, it's the final step
log.debug(f" Marked {remaining_count} remaining files as 'Unrecognised'.")
# --- Final Summary ---
# Note: self.metadata["source_files_in_extra"] is now populated per-asset in _generate_metadata_file
log.info(f"File classification complete.")
log.debug("--- Final Classification Summary (v2) ---")
map_details_log = [f"{m['map_type']}:{m['source_path']}" for m in self.classified_files["maps"]]
model_details_log = [str(f['source_path']) for f in self.classified_files["models"]]
extra_details_log = [f"{str(f['source_path'])} ({f['reason']})" for f in self.classified_files["extra"]]
ignored_details_log = [f"{str(f['source_path'])} ({f['reason']})" for f in self.classified_files["ignored"]]
log.debug(f" Identified Maps ({len(self.classified_files['maps'])}): {map_details_log}")
log.debug(f" Model Files ({len(self.classified_files['models'])}): {model_details_log}")
log.debug(f" Extra/Unrecognised Files ({len(self.classified_files['extra'])}): {extra_details_log}")
log.debug(f" Ignored Files ({len(self.classified_files['ignored'])}): {ignored_details_log}")
log.debug("--- End File Inventory and Classification (v2) ---")
def _determine_base_metadata(self) -> Tuple[List[str], Dict[Path, Optional[str]]]:
"""
Determines distinct asset base names within the input based on preset rules
and maps each relevant source file to its determined base name.
Returns:
Tuple[List[str], Dict[Path, Optional[str]]]:
- A list of unique, sanitized base names found.
- A dictionary mapping source file relative paths to their determined
base name string (or None if no base name could be determined for that file).
"""
if not self.temp_dir: raise AssetProcessingError("Workspace not setup.")
log.info("Determining distinct base names and file mapping...")
# Combine map and model files for base name determination
relevant_files = self.classified_files.get('maps', []) + self.classified_files.get('models', [])
if not relevant_files:
log.warning("No map or model files found to determine base name(s).")
# Fallback: Use input path name as a single asset
input_name = self.input_path.stem if self.input_path.is_file() else self.input_path.name
sanitized_input_name = self._sanitize_filename(input_name or "UnknownInput")
# Map all files (maps, models, extra, ignored) to this fallback name
all_files_paths = [f['source_path'] for cat in self.classified_files.values() for f in cat if 'source_path' in f]
file_to_base_name_map = {f_path: sanitized_input_name for f_path in all_files_paths}
log.info(f"Using input path name '{sanitized_input_name}' as the single asset name.")
return [sanitized_input_name], file_to_base_name_map
# --- Determine Base Names from Files ---
separator = self.config.source_naming_separator
indices_dict = self.config.source_naming_indices
base_index_raw = indices_dict.get('base_name')
base_index = None
if base_index_raw is not None:
try:
base_index = int(base_index_raw)
except (ValueError, TypeError):
log.warning(f"Could not convert base_name index '{base_index_raw}' to integer. Base name determination might be inaccurate.")
file_to_base_name_map: Dict[Path, Optional[str]] = {}
potential_base_names_per_file: Dict[Path, str] = {} # Store potential name for each file path
if isinstance(base_index, int):
log.debug(f"Attempting base name extraction using separator '{separator}' and index {base_index}.")
for file_info in relevant_files:
file_path = file_info['source_path']
stem = file_path.stem
parts = stem.split(separator)
if len(parts) > base_index:
extracted_name = parts[base_index]
sanitized_name = self._sanitize_filename(extracted_name)
if sanitized_name: # Ensure we don't add empty names
potential_base_names_per_file[file_path] = sanitized_name
log.debug(f" File '{file_path.name}' -> Potential Base Name: '{sanitized_name}'")
else:
log.debug(f" File '{file_path.name}' -> Extracted empty name at index {base_index}. Marking as None.")
file_to_base_name_map[file_path] = None # Explicitly mark as None if extraction yields empty
else:
log.debug(f" File '{file_path.name}' -> Stem '{stem}' has too few parts ({len(parts)}) for index {base_index}. Marking as None.")
file_to_base_name_map[file_path] = None # Mark as None if index is invalid for this file
else:
log.warning("Base name index not configured or invalid. Cannot determine distinct assets based on index. Treating as single asset.")
# Fallback to common prefix if no valid index
stems = [f['source_path'].stem for f in relevant_files]
common_prefix_name = os.path.commonprefix(stems) if stems else ""
sanitized_common_name = self._sanitize_filename(common_prefix_name or self.input_path.stem or "UnknownAsset")
log.info(f"Using common prefix '{sanitized_common_name}' as the single asset name.")
# Map all relevant files to this single name
for file_info in relevant_files:
potential_base_names_per_file[file_info['source_path']] = sanitized_common_name
# --- Consolidate Distinct Names and Final Mapping ---
distinct_base_names_set = set(potential_base_names_per_file.values())
distinct_base_names = sorted(list(distinct_base_names_set)) # Sort for consistent processing order
# Populate the final map, including files that didn't match the index rule (marked as None earlier)
for file_info in relevant_files:
file_path = file_info['source_path']
if file_path not in file_to_base_name_map: # If not already marked as None
file_to_base_name_map[file_path] = potential_base_names_per_file.get(file_path) # Assign determined name or None if somehow missed
# Add files from 'extra' and 'ignored' to the map, marking them as None for base name
for category in ['extra', 'ignored']:
for file_info in self.classified_files.get(category, []):
file_path = file_info['source_path']
if file_path not in file_to_base_name_map: # Avoid overwriting if somehow already mapped
file_to_base_name_map[file_path] = None
log.debug(f" File '{file_path.name}' (Category: {category}) -> Marked as None (No Base Name).")
if not distinct_base_names:
# This case should be rare due to fallbacks, but handle it.
log.warning("No distinct base names could be determined. Using input name as fallback.")
input_name = self.input_path.stem if self.input_path.is_file() else self.input_path.name
fallback_name = self._sanitize_filename(input_name or "FallbackAsset")
distinct_base_names = [fallback_name]
# Remap all files to this single fallback name
file_to_base_name_map = {f_path: fallback_name for f_path in file_to_base_name_map.keys()}
log.info(f"Determined {len(distinct_base_names)} distinct asset base name(s): {distinct_base_names}")
log.debug(f"File-to-BaseName Map ({len(file_to_base_name_map)} entries): { {str(k): v for k, v in file_to_base_name_map.items()} }") # Log string paths for readability
return distinct_base_names, file_to_base_name_map
def _determine_single_asset_metadata(self, asset_base_name: str, filtered_classified_files: Dict[str, List[Dict]]) -> Dict[str, str]:
"""
Determines the asset_category and archetype for a single, specific asset
based on its filtered list of classified files.
Args:
asset_base_name: The determined base name for this specific asset.
filtered_classified_files: A dictionary containing only the classified
files (maps, models, etc.) belonging to this asset.
Returns:
A dictionary containing {"asset_category": str, "archetype": str}.
"""
log.debug(f"Determining category and archetype for asset: '{asset_base_name}'")
determined_category = self.config.default_asset_category # Start with default
determined_archetype = "Unknown"
# --- Determine Asset Category ---
if filtered_classified_files.get("models"):
determined_category = "Asset"
log.debug(f" Category set to 'Asset' for '{asset_base_name}' due to model file presence.")
else:
# Check for Decal keywords only if not an Asset
decal_keywords = self.config.asset_category_rules.get('decal_keywords', [])
found_decal = False
# Check map names first for decal keywords
candidate_files = [f['source_path'] for f in filtered_classified_files.get('maps', [])]
# Fallback to checking extra files if no maps found for this asset
if not candidate_files:
candidate_files = [f['source_path'] for f in filtered_classified_files.get('extra', [])]
if decal_keywords:
for file_path in candidate_files:
# Check against the specific file's name within this asset's context
for keyword in decal_keywords:
if keyword.lower() in file_path.name.lower():
determined_category = "Decal"
found_decal = True; break
if found_decal: break
if found_decal: log.debug(f" Category set to 'Decal' for '{asset_base_name}' due to keyword match.")
# If not Asset or Decal, it remains the default (e.g., "Texture")
log.debug(f" Determined Category for '{asset_base_name}': {determined_category}")
# --- Determine Archetype (Usage) ---
archetype_rules = self.config.archetype_rules
# Use stems from maps and models belonging *only* to this asset
check_stems = [f['source_path'].stem.lower() for f in filtered_classified_files.get('maps', [])]
check_stems.extend([f['source_path'].stem.lower() for f in filtered_classified_files.get('models', [])])
# Also check the determined base name itself
check_stems.append(asset_base_name.lower())
if check_stems:
best_match_archetype = "Unknown"
# Using simple "first match wins" logic as before
for rule in archetype_rules:
if len(rule) != 2 or not isinstance(rule[1], dict): continue
arch_name, rules_dict = rule
match_any = rules_dict.get("match_any", [])
matched_any_keyword = False
if match_any:
for keyword in match_any:
kw_lower = keyword.lower()
for stem in check_stems:
if kw_lower in stem: # Simple substring check
matched_any_keyword = True
break # Found a match for this keyword
if matched_any_keyword: break # Found a match for this rule's keywords
if matched_any_keyword:
best_match_archetype = arch_name
log.debug(f" Archetype match '{arch_name}' for '{asset_base_name}' based on keywords: {match_any}")
break # First rule match wins
determined_archetype = best_match_archetype
log.debug(f" Determined Archetype for '{asset_base_name}': {determined_archetype}")
return {"asset_category": determined_category, "archetype": determined_archetype}
def _process_individual_maps(self, filtered_maps_list: List[Dict], current_asset_metadata: Dict, loaded_data_cache: dict) -> Tuple[Dict[str, Dict[str, Dict]], Dict[str, Dict], str, List[Dict]]:
"""
Processes, resizes, and saves classified map files for a specific asset
that are NOT used as inputs for merge rules. Uses helper functions.
Args:
filtered_maps_list: List of map dictionaries belonging to the current asset.
current_asset_metadata: Metadata dictionary for the current asset.
loaded_data_cache: Cache dictionary for loaded/resized source data.
Returns:
Tuple containing:
- processed_maps_details_asset: Dict mapping map_type to resolution details.
- image_stats_asset: Dict mapping map_type to calculated image statistics.
- aspect_ratio_change_string_asset: String indicating aspect ratio change.
- ignored_rough_maps: List of map dictionaries for native rough maps ignored due to gloss priority.
"""
if not self.temp_dir: raise AssetProcessingError("Workspace not setup.")
asset_name = current_asset_metadata.get("asset_name", "UnknownAsset")
log.info(f"Processing individual map files for asset '{asset_name}'...")
# Initialize results specific to this asset
processed_maps_details_asset: Dict[str, Dict[str, Dict]] = defaultdict(dict)
image_stats_asset: Dict[str, Dict] = {}
map_details_asset: Dict[str, Dict] = {} # Store details like source bit depth, gloss inversion
aspect_ratio_change_string_asset: str = "N/A"
ignored_rough_maps: List[Dict] = [] # Store ignored native rough maps
# --- Settings retrieval ---
resolutions = self.config.image_resolutions
stats_res_key = self.config.calculate_stats_resolution
stats_target_dim = resolutions.get(stats_res_key)
if not stats_target_dim: log.warning(f"Stats resolution key '{stats_res_key}' not found. Stats skipped for '{asset_name}'.")
gloss_keywords = self.config.source_glossiness_keywords
# target_pattern = self.config.target_filename_pattern # Not needed here, handled by _save_image
base_name = asset_name # Use the asset name passed in
# --- Pre-process Glossiness -> Roughness ---
# This logic needs to stay here to determine which ROUGH source to use
# and potentially ignore the native one.
derived_from_gloss_flag = {}
gloss_map_info_for_rough, native_rough_map_info = None, None
for map_info in filtered_maps_list:
# Use the final assigned map_type (e.g., ROUGH, ROUGH-1)
if map_info['map_type'].startswith('ROUGH'):
is_gloss = any(kw.lower() in map_info['source_path'].stem.lower() for kw in gloss_keywords)
if is_gloss:
# If multiple gloss sources map to ROUGH variants, prioritize the first one?
# For now, assume only one gloss source maps to ROUGH variants.
if gloss_map_info_for_rough is None: gloss_map_info_for_rough = map_info
else:
# If multiple native rough sources map to ROUGH variants, prioritize the first one?
if native_rough_map_info is None: native_rough_map_info = map_info
rough_source_to_use_info = None # Store the map_info dict of the source to use
if gloss_map_info_for_rough:
rough_source_to_use_info = gloss_map_info_for_rough
derived_from_gloss_flag['ROUGH'] = True # Apply to all ROUGH variants if derived from gloss
if native_rough_map_info:
log.warning(f"Asset '{asset_name}': Both Gloss source ('{gloss_map_info_for_rough['source_path']}') and Rough source ('{native_rough_map_info['source_path']}') found for ROUGH maps. Prioritizing Gloss.")
ignored_rough_maps.append({'source_path': native_rough_map_info['source_path'], 'reason': 'Superseded by Gloss->Rough'})
elif native_rough_map_info:
rough_source_to_use_info = native_rough_map_info
derived_from_gloss_flag['ROUGH'] = False
# --- Identify maps used in merge rules ---
merge_input_map_types = set()
for rule in self.config.map_merge_rules:
inputs_mapping = rule.get("inputs", {})
for source_map_type in inputs_mapping.values():
# Use the base type for checking against merge rules
base_type = _get_base_map_type(source_map_type)
merge_input_map_types.add(base_type)
log.debug(f"Map types used as input for merge rules: {merge_input_map_types}")
# --- Filter maps to process individually ---
maps_to_process_individually = []
for map_info in filtered_maps_list:
base_map_type = _get_base_map_type(map_info['map_type'])
# Skip if this base map type is used in *any* merge rule input
if base_map_type in merge_input_map_types:
log.debug(f"Skipping individual processing for {map_info['map_type']} ({map_info['source_path']}) as its base type '{base_map_type}' is used in merge rules.")
continue
# Skip native rough map if gloss was prioritized
if map_info['map_type'].startswith('ROUGH') and any(ignored['source_path'] == map_info['source_path'] for ignored in ignored_rough_maps):
log.debug(f"Skipping individual processing of native rough map '{map_info['source_path']}' as gloss version was prioritized.")
continue
maps_to_process_individually.append(map_info)
log.info(f"Processing {len(maps_to_process_individually)} maps individually for asset '{asset_name}'...")
# --- Aspect Ratio Calculation Setup ---
# We need original dimensions once per asset for aspect ratio.
# Find the first map to process to get its dimensions.
first_map_info_for_aspect = next((m for m in maps_to_process_individually), None)
orig_w_aspect, orig_h_aspect = None, None
if first_map_info_for_aspect:
# Load just to get dimensions (might hit cache if used later)
# Use the first resolution key as a representative target for loading
first_res_key = next(iter(resolutions))
temp_img_for_dims, _ = self._load_and_transform_source(
first_map_info_for_aspect['source_path'],
first_map_info_for_aspect['map_type'],
first_res_key,
False, # is_gloss_source doesn't matter for dims
loaded_data_cache # Use the main cache
)
if temp_img_for_dims is not None:
orig_h_aspect, orig_w_aspect = temp_img_for_dims.shape[:2]
log.debug(f"Got original dimensions ({orig_w_aspect}x{orig_h_aspect}) for aspect ratio calculation from {first_map_info_for_aspect['source_path']}")
else:
log.warning(f"Could not load image {first_map_info_for_aspect['source_path']} to get original dimensions for aspect ratio.")
else:
log.warning("No maps found to process individually, cannot calculate aspect ratio string.")
# --- Process Each Individual Map ---
for map_info in maps_to_process_individually:
map_type = map_info['map_type'] # Final type (e.g., COL-1)
source_path_rel = map_info['source_path']
original_extension = map_info.get('original_extension', '.png')
# Determine if this specific map type should use gloss inversion logic
# If ROUGH-1, ROUGH-2 etc derive from gloss, they all use inversion
is_gloss_source_for_this_map = map_type.startswith('ROUGH') and derived_from_gloss_flag.get('ROUGH', False)
log.info(f"-- Asset '{asset_name}': Processing Individual Map: {map_type} (Source: {source_path_rel.name}) --")
current_map_details = {"derived_from_gloss": is_gloss_source_for_this_map}
source_bit_depth_found = None # Track if we've found the bit depth for this map type
try:
# --- Loop through target resolutions ---
for res_key, target_dim_px in resolutions.items():
log.debug(f"Processing {map_type} for resolution: {res_key}...")
# --- 1. Load and Transform Source (using helper + cache) ---
img_resized, source_dtype = self._load_and_transform_source(
source_path_rel=source_path_rel,
map_type=map_type, # Pass the specific map type (e.g., ROUGH-1)
target_resolution_key=res_key,
is_gloss_source=is_gloss_source_for_this_map,
cache=loaded_data_cache
)
if img_resized is None:
log.warning(f"Failed to load/transform source {source_path_rel} for {res_key}. Skipping resolution.")
continue # Skip this resolution
# Store source bit depth once found
if source_dtype is not None and source_bit_depth_found is None:
source_bit_depth_found = 16 if source_dtype == np.uint16 else (8 if source_dtype == np.uint8 else 8) # Default non-uint to 8
current_map_details["source_bit_depth"] = source_bit_depth_found
log.debug(f"Stored source bit depth for {map_type}: {source_bit_depth_found}")
# --- 2. Calculate Stats (if applicable) ---
if res_key == stats_res_key and stats_target_dim:
log.debug(f"Calculating stats for {map_type} using {res_key} image...")
stats = _calculate_image_stats(img_resized)
if stats: image_stats_asset[map_type] = stats
else: log.warning(f"Stats calculation failed for {map_type} at {res_key}.")
# --- 3. Calculate Aspect Ratio Change String (once per asset, using pre-calculated dims) ---
if aspect_ratio_change_string_asset == "N/A" and orig_w_aspect is not None and orig_h_aspect is not None:
target_w_aspect, target_h_aspect = img_resized.shape[1], img_resized.shape[0] # Use current resized dims
try:
aspect_string = self._normalize_aspect_ratio_change(orig_w_aspect, orig_h_aspect, target_w_aspect, target_h_aspect)
aspect_ratio_change_string_asset = aspect_string
log.debug(f"Stored aspect ratio change string using {res_key}: '{aspect_string}'")
except Exception as aspect_err:
log.error(f"Failed to calculate aspect ratio change string using {res_key}: {aspect_err}", exc_info=True)
aspect_ratio_change_string_asset = "Error"
elif aspect_ratio_change_string_asset == "N/A":
# This case happens if we couldn't get original dims
aspect_ratio_change_string_asset = "Unknown" # Set to unknown instead of recalculating
# --- 4. Save Image (using helper) ---
source_info = {
'original_extension': original_extension,
'source_bit_depth': source_bit_depth_found or 8, # Use found depth or default
'involved_extensions': {original_extension} # Only self for individual maps
}
bit_depth_rule = self.config.get_bit_depth_rule(map_type) # Get rule for this specific map type
save_result = self._save_image(
image_data=img_resized,
map_type=map_type,
resolution_key=res_key,
asset_base_name=base_name,
source_info=source_info,
output_bit_depth_rule=bit_depth_rule,
temp_dir=self.temp_dir
)
# --- 5. Store Result ---
if save_result:
processed_maps_details_asset.setdefault(map_type, {})[res_key] = save_result
# Update overall map detail (e.g., final format) if needed
current_map_details["output_format"] = save_result.get("format")
else:
log.error(f"Failed to save {map_type} at {res_key}.")
processed_maps_details_asset.setdefault(map_type, {})[f'error_{res_key}'] = "Save failed"
except Exception as map_proc_err:
log.error(f"Failed processing map {map_type} from {source_path_rel.name}: {map_proc_err}", exc_info=True)
processed_maps_details_asset.setdefault(map_type, {})['error'] = str(map_proc_err)
# Store collected details for this map type
map_details_asset[map_type] = current_map_details
# --- Final Metadata Updates (Handled in main process loop) ---
# Update the passed-in current_asset_metadata dictionary directly with map_details
# This avoids returning it and merging later.
current_asset_metadata["map_details"] = map_details_asset
log.info(f"Finished processing individual map files for asset '{asset_name}'.")
return processed_maps_details_asset, image_stats_asset, aspect_ratio_change_string_asset, ignored_rough_maps
def _merge_maps_from_source(self, processed_maps_details_asset: Dict[str, Dict[str, Dict]], filtered_classified_files: Dict[str, List[Dict]], current_asset_metadata: Dict, loaded_data_cache: dict) -> Dict[str, Dict[str, Dict]]:
"""
Merges channels from different SOURCE maps for a specific asset based on rules
in configuration, using helper functions for loading and saving.
Args:
processed_maps_details_asset: Details of processed maps (used to find common resolutions).
filtered_classified_files: Classified files dictionary filtered for this asset (used to find source paths).
current_asset_metadata: Metadata dictionary for the current asset.
loaded_data_cache: Cache dictionary for loaded/resized source data.
Returns:
Dict[str, Dict[str, Dict]]: Details of the merged maps created for this asset.
"""
if not self.temp_dir: raise AssetProcessingError("Workspace not setup.")
asset_name = current_asset_metadata.get("asset_name", "UnknownAsset")
gloss_keywords = self.config.source_glossiness_keywords # Get gloss keywords
merge_rules = self.config.map_merge_rules
log.info(f"Asset '{asset_name}': Applying {len(merge_rules)} map merging rule(s) from source...")
# Initialize results for this asset
merged_maps_details_asset: Dict[str, Dict[str, Dict]] = defaultdict(dict)
for rule_index, rule in enumerate(merge_rules):
output_map_type = rule.get("output_map_type")
inputs_mapping = rule.get("inputs") # e.g., {"R": "AO", "G": "ROUGH", "B": "METAL"}
defaults = rule.get("defaults", {})
rule_bit_depth = rule.get("output_bit_depth", "respect_inputs")
if not output_map_type or not inputs_mapping:
log.warning(f"Asset '{asset_name}': Skipping merge rule #{rule_index+1}: Missing 'output_map_type' or 'inputs'. Rule: {rule}")
continue
log.info(f"-- Asset '{asset_name}': Applying merge rule for '{output_map_type}' --")
# --- Find required SOURCE files and their details for this asset ---
required_input_sources = {} # map_type -> {'source_path': Path, 'original_extension': str, 'is_gloss_source': bool}
possible_to_find_sources = True
for input_type in set(inputs_mapping.values()): # e.g., {"AO", "ROUGH", "METAL"}
found_source_for_type = False
# Search in the filtered classified maps for this asset
for classified_map in filtered_classified_files.get("maps", []):
# Check if the classified map's type matches the required input type
# This needs to handle variants (e.g., ROUGH-1 should match ROUGH)
if classified_map['map_type'].startswith(input_type):
source_path_rel = classified_map.get('source_path')
if not source_path_rel: continue # Skip if path is missing
# Determine if this source is gloss (only relevant if input_type is ROUGH)
is_gloss = False
if input_type == 'ROUGH':
is_gloss = any(kw.lower() in source_path_rel.stem.lower() for kw in gloss_keywords)
# Prioritize gloss source if both exist (logic from _process_individual_maps)
native_rough_exists = any(m['map_type'].startswith('ROUGH') and not any(gk.lower() in m['source_path'].stem.lower() for gk in gloss_keywords) for m in filtered_classified_files.get("maps", []))
if is_gloss and native_rough_exists:
log.debug(f"Merge input '{input_type}': Prioritizing gloss source '{source_path_rel}' over native rough.")
elif not is_gloss and native_rough_exists and any(m['map_type'].startswith('ROUGH') and any(gk.lower() in m['source_path'].stem.lower() for gk in gloss_keywords) for m in filtered_classified_files.get("maps", [])):
log.debug(f"Merge input '{input_type}': Skipping native rough source '{source_path_rel}' because gloss source exists.")
continue # Skip this native rough source
required_input_sources[input_type] = {
'source_path': source_path_rel,
'original_extension': classified_map.get('original_extension', '.png'),
'is_gloss_source': is_gloss
}
found_source_for_type = True
log.debug(f"Found source for merge input '{input_type}': {source_path_rel} (Gloss: {is_gloss})")
break # Found the first matching source for this input type
if not found_source_for_type:
log.warning(f"Asset '{asset_name}': Required source file for input map type '{input_type}' not found in classified files. Cannot perform merge for '{output_map_type}'.")
possible_to_find_sources = False
break
if not possible_to_find_sources:
continue # Skip this merge rule
# --- Determine common resolutions based on *processed* maps (as a proxy for available sizes) ---
# This assumes _process_individual_maps ran first and populated processed_maps_details_asset
possible_resolutions_per_input = []
for input_type in set(inputs_mapping.values()):
if input_type in processed_maps_details_asset:
res_keys = {res for res, details in processed_maps_details_asset[input_type].items() if isinstance(details, dict) and 'error' not in details}
if not res_keys:
log.warning(f"Asset '{asset_name}': Input map type '{input_type}' for merge rule '{output_map_type}' has no successfully processed resolutions (needed for size check).")
possible_resolutions_per_input = []
break
possible_resolutions_per_input.append(res_keys)
else:
# This case might happen if the input map is *only* used for merging
# We need a way to determine available resolutions without relying on prior processing.
# For now, we'll rely on the check above ensuring the source exists.
# We'll load the source at *all* target resolutions and let _load_and_transform_source handle skipping if upscale is needed.
log.debug(f"Input map type '{input_type}' for merge rule '{output_map_type}' might not have been processed individually. Will attempt loading source for all target resolutions.")
# Add all configured resolutions as possibilities for this input
possible_resolutions_per_input.append(set(self.config.image_resolutions.keys()))
if not possible_resolutions_per_input:
log.warning(f"Asset '{asset_name}': Cannot determine common resolutions for '{output_map_type}'. Skipping rule.")
continue
common_resolutions = set.intersection(*possible_resolutions_per_input)
if not common_resolutions:
log.warning(f"Asset '{asset_name}': No common resolutions found among required inputs {set(inputs_mapping.values())} for merge rule '{output_map_type}'. Skipping rule.")
continue
log.debug(f"Asset '{asset_name}': Common resolutions for '{output_map_type}': {common_resolutions}")
# --- Loop through common resolutions ---
res_order = {k: self.config.image_resolutions[k] for k in common_resolutions if k in self.config.image_resolutions}
if not res_order:
log.warning(f"Asset '{asset_name}': Common resolutions {common_resolutions} do not match config. Skipping merge for '{output_map_type}'.")
continue
sorted_res_keys = sorted(res_order.keys(), key=lambda k: res_order[k], reverse=True)
base_name = asset_name # Use current asset's name
for current_res_key in sorted_res_keys:
log.debug(f"Asset '{asset_name}': Merging '{output_map_type}' for resolution: {current_res_key}")
try:
loaded_inputs_data = {} # map_type -> loaded numpy array
source_info_for_save = {'involved_extensions': set(), 'max_input_bit_depth': 8}
# --- Load required SOURCE maps using helper ---
possible_to_load = True
target_channels = list(inputs_mapping.keys()) # e.g., ['R', 'G', 'B']
for map_type in set(inputs_mapping.values()): # e.g., {"AO", "ROUGH", "METAL"}
source_details = required_input_sources.get(map_type)
if not source_details:
log.error(f"Internal Error: Source details missing for '{map_type}' during merge load.")
possible_to_load = False; break
source_path_rel = source_details['source_path']
is_gloss = source_details['is_gloss_source']
original_ext = source_details['original_extension']
source_info_for_save['involved_extensions'].add(original_ext)
log.debug(f"Loading source '{source_path_rel}' for merge input '{map_type}' at {current_res_key} (Gloss: {is_gloss})")
img_resized, source_dtype = self._load_and_transform_source(
source_path_rel=source_path_rel,
map_type=map_type, # Pass the base map type (e.g., ROUGH)
target_resolution_key=current_res_key,
is_gloss_source=is_gloss,
cache=loaded_data_cache
)
if img_resized is None:
log.warning(f"Asset '{asset_name}': Failed to load/transform source '{source_path_rel}' for merge input '{map_type}' at {current_res_key}. Skipping resolution.")
possible_to_load = False; break
loaded_inputs_data[map_type] = img_resized
# Track max source bit depth
if source_dtype == np.uint16:
source_info_for_save['max_input_bit_depth'] = max(source_info_for_save['max_input_bit_depth'], 16)
# Add other dtype checks if needed (e.g., float32 -> 16?)
if not possible_to_load: continue
# --- Calculate Stats for ROUGH source if used and at stats resolution ---
stats_res_key = self.config.calculate_stats_resolution
if current_res_key == stats_res_key:
log.debug(f"Asset '{asset_name}': Checking for ROUGH source stats for '{output_map_type}' at {stats_res_key}")
for target_channel, source_map_type in inputs_mapping.items():
if source_map_type == 'ROUGH' and source_map_type in loaded_inputs_data:
log.debug(f"Asset '{asset_name}': Calculating stats for ROUGH source (mapped to channel '{target_channel}') for '{output_map_type}' at {stats_res_key}")
rough_image_data = loaded_inputs_data[source_map_type]
rough_stats = _calculate_image_stats(rough_image_data)
if rough_stats:
# Ensure the nested dictionary structure exists
if "merged_map_channel_stats" not in current_asset_metadata:
current_asset_metadata["merged_map_channel_stats"] = {}
if output_map_type not in current_asset_metadata["merged_map_channel_stats"]:
current_asset_metadata["merged_map_channel_stats"][output_map_type] = {}
if target_channel not in current_asset_metadata["merged_map_channel_stats"][output_map_type]:
current_asset_metadata["merged_map_channel_stats"][output_map_type][target_channel] = {}
current_asset_metadata["merged_map_channel_stats"][output_map_type][target_channel][stats_res_key] = rough_stats
log.debug(f"Asset '{asset_name}': Stored ROUGH stats for '{output_map_type}' channel '{target_channel}' at {stats_res_key}: {rough_stats}")
else:
log.warning(f"Asset '{asset_name}': Failed to calculate ROUGH stats for '{output_map_type}' channel '{target_channel}' at {stats_res_key}.")
# --- Determine dimensions ---
# All loaded inputs should have the same dimensions for this resolution
first_map_type = next(iter(loaded_inputs_data))
h, w = loaded_inputs_data[first_map_type].shape[:2]
num_target_channels = len(target_channels)
# --- Prepare and Merge Channels ---
merged_channels_float32 = []
for target_channel in target_channels: # e.g., 'R', 'G', 'B'
source_map_type = inputs_mapping.get(target_channel) # e.g., "AO", "ROUGH", "METAL"
channel_data_float32 = None
if source_map_type and source_map_type in loaded_inputs_data:
img_input = loaded_inputs_data[source_map_type] # Get the loaded NumPy array
# Ensure input is float32 0-1 range for merging
if img_input.dtype == np.uint16: img_float = img_input.astype(np.float32) / 65535.0
elif img_input.dtype == np.uint8: img_float = img_input.astype(np.float32) / 255.0
elif img_input.dtype == np.float16: img_float = img_input.astype(np.float32) # Assume float16 is 0-1
else: img_float = img_input.astype(np.float32) # Assume other floats are 0-1
num_source_channels = img_float.shape[2] if len(img_float.shape) == 3 else 1
# Extract the correct channel
if num_source_channels >= 3:
if target_channel == 'R': channel_data_float32 = img_float[:, :, 0]
elif target_channel == 'G': channel_data_float32 = img_float[:, :, 1]
elif target_channel == 'B': channel_data_float32 = img_float[:, :, 2]
elif target_channel == 'A' and num_source_channels == 4: channel_data_float32 = img_float[:, :, 3]
else: log.warning(f"Target channel '{target_channel}' invalid for 3/4 channel source '{source_map_type}'.")
elif num_source_channels == 1 or len(img_float.shape) == 2:
# If source is grayscale, use it for R, G, B, or A target channels
channel_data_float32 = img_float.reshape(h, w)
else:
log.warning(f"Unexpected shape {img_float.shape} for source '{source_map_type}'.")
# Apply default if channel data couldn't be extracted
if channel_data_float32 is None:
default_val = defaults.get(target_channel)
if default_val is None:
raise AssetProcessingError(f"Missing input/default for target channel '{target_channel}' in merge rule '{output_map_type}'.")
log.debug(f"Using default value {default_val} for target channel '{target_channel}' in '{output_map_type}'.")
channel_data_float32 = np.full((h, w), float(default_val), dtype=np.float32)
merged_channels_float32.append(channel_data_float32)
if not merged_channels_float32 or len(merged_channels_float32) != num_target_channels:
raise AssetProcessingError(f"Channel count mismatch during merge for '{output_map_type}'. Expected {num_target_channels}, got {len(merged_channels_float32)}.")
merged_image_float32 = cv2.merge(merged_channels_float32)
log.debug(f"Merged channels for '{output_map_type}' ({current_res_key}). Result shape: {merged_image_float32.shape}, dtype: {merged_image_float32.dtype}")
# --- Save Merged Map using Helper ---
save_result = self._save_image(
image_data=merged_image_float32, # Pass the merged float32 data
map_type=output_map_type,
resolution_key=current_res_key,
asset_base_name=base_name,
source_info=source_info_for_save, # Pass collected source info
output_bit_depth_rule=rule_bit_depth, # Pass the rule's requirement
temp_dir=self.temp_dir
)
# --- Record details locally ---
if save_result:
merged_maps_details_asset[output_map_type][current_res_key] = save_result
else:
log.error(f"Asset '{asset_name}': Failed to save merged map '{output_map_type}' at resolution '{current_res_key}'.")
merged_maps_details_asset.setdefault(output_map_type, {})[f'error_{current_res_key}'] = "Save failed via helper"
except Exception as merge_res_err:
log.error(f"Asset '{asset_name}': Failed merging '{output_map_type}' at resolution '{current_res_key}': {merge_res_err}", exc_info=True)
# Store error locally for this asset
merged_maps_details_asset.setdefault(output_map_type, {})[f'error_{current_res_key}'] = str(merge_res_err)
log.info(f"Asset '{asset_name}': Finished applying map merging rules.")
# Return the details for this asset
return merged_maps_details_asset
def _generate_metadata_file(self, current_asset_metadata: Dict, processed_maps_details_asset: Dict[str, Dict[str, Dict]], merged_maps_details_asset: Dict[str, Dict[str, Dict]], filtered_classified_files_asset: Dict[str, List[Dict]], unmatched_files_paths: List[Path], map_details_asset: Dict[str, Dict]) -> Path:
"""
Gathers metadata for a specific asset and writes it to a temporary JSON file.
Args:
current_asset_metadata: Base metadata for this asset (name, category, archetype, etc.).
processed_maps_details_asset: Details of processed maps for this asset.
merged_maps_details_asset: Details of merged maps for this asset.
filtered_classified_files_asset: Classified files belonging only to this asset.
unmatched_files_paths: List of relative paths for files not matched to any base name.
map_details_asset: Dictionary containing details like source bit depth, gloss inversion per map type.
Returns:
Path: The path to the generated temporary metadata file.
"""
if not self.temp_dir: raise AssetProcessingError("Workspace not setup.")
asset_name = current_asset_metadata.get("asset_name")
if not asset_name or asset_name == "UnknownAssetName":
log.warning("Asset name unknown during metadata generation, file may be incomplete or incorrectly named.")
asset_name = "UnknownAsset_Metadata" # Fallback for filename
log.info(f"Generating metadata file for asset '{asset_name}'...")
# Start with the base metadata passed in for this asset
final_metadata = current_asset_metadata.copy()
# Populate map details from the specific asset's processing results
# Add merged map channel stats
final_metadata["merged_map_channel_stats"] = current_asset_metadata.get("merged_map_channel_stats", {}) # Get from passed metadata
final_metadata["processed_map_resolutions"] = {}
for map_type, res_dict in processed_maps_details_asset.items():
keys = [res for res, d in res_dict.items() if isinstance(d, dict) and 'error' not in d]
if keys: final_metadata["processed_map_resolutions"][map_type] = sorted(keys)
final_metadata["merged_map_resolutions"] = {}
for map_type, res_dict in merged_maps_details_asset.items():
keys = [res for res, d in res_dict.items() if isinstance(d, dict) and 'error' not in d]
if keys: final_metadata["merged_map_resolutions"][map_type] = sorted(keys)
# Determine maps present based on successful processing for this asset
final_metadata["maps_present"] = sorted(list(processed_maps_details_asset.keys()))
final_metadata["merged_maps"] = sorted(list(merged_maps_details_asset.keys()))
# Determine shader features based on this asset's maps
features = set()
for map_type, details in map_details_asset.items(): # Use map_details_asset passed in
if map_type in ["SSS", "FUZZ", "MASK"]: features.add(map_type)
if details.get("derived_from_gloss"): features.add("InvertedGloss")
res_details = processed_maps_details_asset.get(map_type, {})
if any(res_info.get("bit_depth") == 16 for res_info in res_details.values() if isinstance(res_info, dict)): features.add(f"16bit_{map_type}")
final_metadata["shader_features"] = sorted(list(features))
# Determine source files in this asset's Extra folder
# Includes:
# - Files originally classified as 'Extra' or 'Unrecognised' belonging to this asset.
# - Files originally classified as 'Ignored' belonging to this asset.
# - All 'unmatched' files (belonging to no specific asset).
source_files_in_extra_set = set()
for category in ['extra', 'ignored']:
for file_info in filtered_classified_files_asset.get(category, []):
source_files_in_extra_set.add(str(file_info['source_path']))
# Add all unmatched files
for file_path in unmatched_files_paths:
source_files_in_extra_set.add(str(file_path))
final_metadata["source_files_in_extra"] = sorted(list(source_files_in_extra_set))
# Add image stats and map details specific to this asset
final_metadata["image_stats_1k"] = current_asset_metadata.get("image_stats_1k", {}) # Get from passed metadata
final_metadata["map_details"] = map_details_asset # Use map_details_asset passed in
final_metadata["aspect_ratio_change_string"] = current_asset_metadata.get("aspect_ratio_change_string", "N/A") # Get from passed metadata
# Add processing info
final_metadata["_processing_info"] = {
"preset_used": self.config.preset_name,
"timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"input_source": str(self.input_path.name), # Add original input source
}
# Sort lists just before writing
for key in ["maps_present", "merged_maps", "shader_features", "source_files_in_extra"]:
if key in final_metadata and isinstance(final_metadata[key], list): final_metadata[key].sort()
# Use asset name in temporary filename to avoid conflicts
metadata_filename = f"{asset_name}_{self.config.metadata_filename}"
output_path = self.temp_dir / metadata_filename
log.debug(f"Writing metadata for asset '{asset_name}' to temporary file: {output_path}")
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(final_metadata, f, indent=4, ensure_ascii=False, sort_keys=True)
log.info(f"Metadata file '{metadata_filename}' generated successfully for asset '{asset_name}'.")
return output_path # Return the path to the temporary file
except Exception as e:
raise AssetProcessingError(f"Failed to write metadata file {output_path} for asset '{asset_name}': {e}") from e
def _normalize_aspect_ratio_change(self, original_width, original_height, resized_width, resized_height, decimals=2):
"""
Calculates the aspect ratio change string (e.g., "EVEN", "X133") based on original prototype logic.
Returns the string representation.
"""
if original_width <= 0 or original_height <= 0:
log.warning("Cannot calculate aspect ratio change with zero original dimensions.")
return "InvalidInput"
# Avoid division by zero if resize resulted in zero dimensions (shouldn't happen with checks)
if resized_width <= 0 or resized_height <= 0:
log.warning("Cannot calculate aspect ratio change with zero resized dimensions.")
return "InvalidResize"
# Original logic from user feedback
width_change_percentage = ((resized_width - original_width) / original_width) * 100
height_change_percentage = ((resized_height - original_height) / original_height) * 100
normalized_width_change = width_change_percentage / 100
normalized_height_change = height_change_percentage / 100
normalized_width_change = min(max(normalized_width_change + 1, 0), 2)
normalized_height_change = min(max(normalized_height_change + 1, 0), 2)
# Handle potential zero division if one dimension change is exactly -100% (normalized to 0)
# If both are 0, aspect ratio is maintained. If one is 0, the other dominates.
if normalized_width_change == 0 and normalized_height_change == 0:
closest_value_to_one = 1.0 # Avoid division by zero, effectively scale_factor = 1
elif normalized_width_change == 0:
closest_value_to_one = abs(normalized_height_change)
elif normalized_height_change == 0:
closest_value_to_one = abs(normalized_width_change)
else:
closest_value_to_one = min(abs(normalized_width_change), abs(normalized_height_change))
# Add a small epsilon to avoid division by zero if closest_value_to_one is extremely close to 0
epsilon = 1e-9
scale_factor = 1 / (closest_value_to_one + epsilon) if abs(closest_value_to_one) < epsilon else 1 / closest_value_to_one
scaled_normalized_width_change = scale_factor * normalized_width_change
scaled_normalized_height_change = scale_factor * normalized_height_change
output_width = round(scaled_normalized_width_change, decimals)
output_height = round(scaled_normalized_height_change, decimals)
# Convert to int if exactly 1.0 after rounding
if abs(output_width - 1.0) < epsilon: output_width = 1
if abs(output_height - 1.0) < epsilon: output_height = 1
# Determine output string
if original_width == original_height or abs(output_width - output_height) < epsilon:
output = "EVEN"
elif output_width != 1 and output_height == 1:
output = f"X{str(output_width).replace('.', '')}"
elif output_height != 1 and output_width == 1:
output = f"Y{str(output_height).replace('.', '')}"
else:
# Both changed relative to each other
output = f"X{str(output_width).replace('.', '')}Y{str(output_height).replace('.', '')}"
log.debug(f"Aspect ratio change calculated: Orig=({original_width}x{original_height}), Resized=({resized_width}x{resized_height}) -> String='{output}'")
return output
def _sanitize_filename(self, name: str) -> str:
"""Removes or replaces characters invalid for filenames/directory names."""
# ... (Implementation from Response #51) ...
if not isinstance(name, str): name = str(name)
name = re.sub(r'[^\w.\-]+', '_', name) # Allow alphanumeric, underscore, hyphen, dot
name = re.sub(r'_+', '_', name)
name = name.strip('_')
if not name: name = "invalid_name"
return name
def _organize_output_files(self, current_asset_name: str, processed_maps_details_asset: Dict[str, Dict[str, Dict]], merged_maps_details_asset: Dict[str, Dict[str, Dict]], filtered_classified_files_asset: Dict[str, List[Dict]], unmatched_files_paths: List[Path], temp_metadata_path: Path):
"""
Moves/copies processed files for a specific asset from the temp dir to the final output structure.
Args:
current_asset_name: The sanitized name of the asset being organized.
processed_maps_details_asset: Details of processed maps for this asset.
merged_maps_details_asset: Details of merged maps for this asset.
filtered_classified_files_asset: Classified files dictionary filtered for this asset.
unmatched_files_paths: List of relative paths for files not matched to any base name.
temp_metadata_path: Path to the temporary metadata file for this asset.
"""
if not self.temp_dir or not self.temp_dir.exists(): raise AssetProcessingError("Temp workspace missing.")
if not current_asset_name or current_asset_name == "UnknownAssetName": raise AssetProcessingError("Asset name missing for organization.")
supplier_name = self.config.supplier_name # Get supplier name from config
if not supplier_name: raise AssetProcessingError("Supplier name missing from config.")
supplier_sanitized = self._sanitize_filename(supplier_name)
asset_name_sanitized = self._sanitize_filename(current_asset_name) # Already sanitized, but ensure consistency
final_dir = self.output_base_path / supplier_sanitized / asset_name_sanitized
log.info(f"Organizing output files for asset '{asset_name_sanitized}' into: {final_dir}")
try:
# Handle overwrite logic specifically for this asset's directory
if final_dir.exists() and self.overwrite:
log.warning(f"Output directory exists for '{asset_name_sanitized}' and overwrite is True. Removing existing directory: {final_dir}")
try:
shutil.rmtree(final_dir)
except Exception as rm_err:
raise AssetProcessingError(f"Failed to remove existing output directory {final_dir} for asset '{asset_name_sanitized}' during overwrite: {rm_err}") from rm_err
# Note: Skip check should prevent this if overwrite is False, but mkdir handles exist_ok=True
final_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
if not isinstance(e, AssetProcessingError):
raise AssetProcessingError(f"Failed to create final dir {final_dir} for asset '{asset_name_sanitized}': {e}") from e
else:
raise
# --- Helper for moving files ---
# Keep track of files successfully moved to avoid copying them later as 'unmatched'
moved_source_files = set()
def _safe_move(src_rel_path: Path | None, dest_dir: Path, file_desc: str):
if not src_rel_path: log.warning(f"Asset '{asset_name_sanitized}': Missing src path for {file_desc}."); return
source_abs = self.temp_dir / src_rel_path
# Use the original filename from the source path for the destination
dest_abs = dest_dir / src_rel_path.name
try:
if source_abs.exists():
log.debug(f"Asset '{asset_name_sanitized}': Moving {file_desc}: {source_abs.name} -> {dest_dir.relative_to(self.output_base_path)}/")
dest_dir.mkdir(parents=True, exist_ok=True)
shutil.move(str(source_abs), str(dest_abs))
moved_source_files.add(src_rel_path) # Track successfully moved source files
else: log.warning(f"Asset '{asset_name_sanitized}': Source file missing for {file_desc}: {source_abs}")
except Exception as e: log.error(f"Asset '{asset_name_sanitized}': Failed moving {file_desc} '{source_abs.name}': {e}", exc_info=True)
# --- Helper for copying files (for unmatched extras) ---
def _safe_copy(src_rel_path: Path | None, dest_dir: Path, file_desc: str):
if not src_rel_path: log.warning(f"Asset '{asset_name_sanitized}': Missing src path for {file_desc} copy."); return
# Skip copying if this source file was already moved (e.g., it was an 'Extra' for this specific asset)
if src_rel_path in moved_source_files:
log.debug(f"Asset '{asset_name_sanitized}': Skipping copy of {file_desc} '{src_rel_path.name}' as it was already moved.")
return
source_abs = self.temp_dir / src_rel_path
dest_abs = dest_dir / src_rel_path.name
try:
if source_abs.exists():
# Avoid copying if the exact destination file already exists (e.g., from a previous asset's copy)
if dest_abs.exists():
log.debug(f"Asset '{asset_name_sanitized}': Destination file already exists for {file_desc} copy: {dest_abs.name}. Skipping copy.")
return
log.debug(f"Asset '{asset_name_sanitized}': Copying {file_desc}: {source_abs.name} -> {dest_dir.relative_to(self.output_base_path)}/")
dest_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(source_abs), str(dest_abs)) # Use copy2 to preserve metadata
else: log.warning(f"Asset '{asset_name_sanitized}': Source file missing for {file_desc} copy: {source_abs}")
except Exception as e: log.error(f"Asset '{asset_name_sanitized}': Failed copying {file_desc} '{source_abs.name}': {e}", exc_info=True)
# --- Move Processed/Merged Maps ---
for details_dict in [processed_maps_details_asset, merged_maps_details_asset]:
for map_type, res_dict in details_dict.items():
if 'error' in res_dict: continue
for res_key, details in res_dict.items():
if isinstance(details, dict) and 'path' in details:
_safe_move(details['path'], final_dir, f"{map_type} ({res_key})")
# --- Move Models specific to this asset ---
for model_info in filtered_classified_files_asset.get('models', []):
_safe_move(model_info.get('source_path'), final_dir, "model file")
# --- Move Metadata File ---
if temp_metadata_path and temp_metadata_path.exists():
final_metadata_path = final_dir / self.config.metadata_filename # Use standard name
try:
log.debug(f"Asset '{asset_name_sanitized}': Moving metadata file: {temp_metadata_path.name} -> {final_metadata_path.relative_to(self.output_base_path)}")
shutil.move(str(temp_metadata_path), str(final_metadata_path))
# No need to add metadata path to moved_source_files as it's uniquely generated
except Exception as e:
log.error(f"Asset '{asset_name_sanitized}': Failed moving metadata file '{temp_metadata_path.name}': {e}", exc_info=True)
else:
log.warning(f"Asset '{asset_name_sanitized}': Temporary metadata file path missing or file does not exist: {temp_metadata_path}")
# --- Handle Extra/Ignored/Unmatched Files ---
extra_subdir_name = self.config.extra_files_subdir
extra_dir = final_dir / extra_subdir_name
if filtered_classified_files_asset.get('extra') or filtered_classified_files_asset.get('ignored') or unmatched_files_paths:
try:
extra_dir.mkdir(parents=True, exist_ok=True)
# Move asset-specific Extra/Ignored files
files_to_move_extra = filtered_classified_files_asset.get('extra', []) + filtered_classified_files_asset.get('ignored', [])
if files_to_move_extra:
log.debug(f"Asset '{asset_name_sanitized}': Moving {len(files_to_move_extra)} asset-specific files to '{extra_subdir_name}/'...")
for file_info in files_to_move_extra:
_safe_move(file_info.get('source_path'), extra_dir, f"extra/ignored file ({file_info.get('reason', 'Unknown')})")
# Copy unmatched files
if unmatched_files_paths:
log.debug(f"Asset '{asset_name_sanitized}': Copying {len(unmatched_files_paths)} unmatched files to '{extra_subdir_name}/'...")
for file_path in unmatched_files_paths:
_safe_copy(file_path, extra_dir, "unmatched file")
except Exception as e: log.error(f"Asset '{asset_name_sanitized}': Failed creating/moving/copying to Extra dir {extra_dir}: {e}", exc_info=True)
log.info(f"Finished organizing output for asset '{asset_name_sanitized}'.")
def _cleanup_workspace(self):
"""Removes the temporary workspace directory if it exists."""
# ... (Implementation from Response #45) ...
if self.temp_dir and self.temp_dir.exists():
try:
log.debug(f"Cleaning up temporary workspace: {self.temp_dir}")
shutil.rmtree(self.temp_dir)
self.temp_dir = None
log.debug("Temporary workspace cleaned up successfully.")
except Exception as e:
log.error(f"Failed to remove temporary workspace {self.temp_dir}: {e}", exc_info=True)
# --- Prediction Method ---
def predict_output_structure(self) -> tuple[str | None, str | None, dict[str, str] | None] | None:
"""
Predicts the final output structure (supplier, asset name) and attempts
to predict output filenames for potential map files based on naming conventions.
Does not perform full processing or image loading.
Returns:
tuple[str | None, str | None, dict[str, str] | None]:
(sanitized_supplier_name, sanitized_asset_name, file_predictions_dict)
where file_predictions_dict maps input filename -> predicted output filename.
Returns None if prediction fails critically.
"""
log.debug(f"Predicting output structure and filenames for: {self.input_path.name}")
try:
# 1. Get Supplier Name
supplier_name = self.config.supplier_name
if not supplier_name:
log.warning("Supplier name not found in configuration during prediction.")
return None
# 2. List Input Filenames/Stems
candidate_stems = set() # Use set for unique stems
filenames = []
if self.input_path.is_file() and self.input_path.suffix.lower() == '.zip':
try:
with zipfile.ZipFile(self.input_path, 'r') as zip_ref:
# Get only filenames, ignore directories
filenames = [Path(f).name for f in zip_ref.namelist() if not f.endswith('/')]
except zipfile.BadZipFile:
log.error(f"Bad ZIP file during prediction: {self.input_path.name}")
return None
except Exception as zip_err:
log.error(f"Error reading ZIP file list during prediction for {self.input_path.name}: {zip_err}")
return None # Cannot proceed if we can't list files
elif self.input_path.is_dir():
try:
for item in self.input_path.iterdir():
if item.is_file(): # Only consider files directly in the folder for prediction simplicity
filenames.append(item.name)
# Note: Not walking subdirs for prediction to keep it fast
except Exception as dir_err:
log.error(f"Error listing directory contents during prediction for {self.input_path.name}: {dir_err}")
return None
if not filenames:
log.warning(f"No files found in input for prediction: {self.input_path.name}")
return None # Return None if no files found
# 3. Lightweight Classification for Stems and Potential Maps
map_type_mapping = self.config.map_type_mapping
model_patterns = self.config.asset_category_rules.get('model_patterns', [])
separator = self.config.source_naming_separator
processed_filenames = set() # Track full filenames processed
potential_map_files = {} # Store fname -> potential map_type
for fname in filenames:
if fname in processed_filenames: continue
fstem = Path(fname).stem
fstem_lower = fstem.lower()
name_parts = fstem_lower.split(separator)
# Check map rules first
map_matched = False
for mapping_rule in map_type_mapping:
source_keywords, standard_map_type = mapping_rule
if standard_map_type not in self.config.standard_map_types: continue
for keyword in source_keywords:
kw_lower = keyword.lower().strip('*')
if kw_lower in name_parts:
is_exact_match = any(part == kw_lower for part in name_parts)
if is_exact_match:
candidate_stems.add(fstem) # Add unique stem
potential_map_files[fname] = standard_map_type # Store potential type
processed_filenames.add(fname)
map_matched = True
break # Found keyword match for this rule
if map_matched: break # Found a rule match for this file
if map_matched: continue # Move to next filename if identified as map
# Check model patterns if not a map
for pattern in model_patterns:
if fnmatch(fname.lower(), pattern.lower()):
candidate_stems.add(fstem) # Still add stem for base name determination
processed_filenames.add(fname)
# Don't add models to potential_map_files
break # Found model match
# Note: Files matching neither maps nor models are ignored for prediction details
log.debug(f"[PREDICTION] Potential map files identified: {potential_map_files}") # DEBUG PREDICTION
candidate_stems_list = list(candidate_stems) # Convert set to list for commonprefix
log.debug(f"[PREDICTION] Candidate stems identified: {candidate_stems_list}") # DEBUG PREDICTION
if not candidate_stems_list:
log.warning(f"Prediction: No relevant map/model stems found in {self.input_path.name}. Using input name as fallback.")
# Fallback: Use the input path's name itself if no stems found
base_name_fallback = self.input_path.stem if self.input_path.is_file() else self.input_path.name
determined_base_name = base_name_fallback
else:
# 4. Replicate _determine_base_metadata logic for base name
determined_base_name = "UnknownAssetName"
separator = self.config.source_naming_separator
indices_dict = self.config.source_naming_indices
base_index_raw = indices_dict.get('base_name')
log.debug(f"[PREDICTION] Base Name Determination: Separator='{separator}', Indices Dict={indices_dict}, Raw Base Index='{base_index_raw}'") # DEBUG PREDICTION
base_index = None
if base_index_raw is not None:
try:
base_index = int(base_index_raw) # Use explicit conversion like in main logic
except (ValueError, TypeError):
log.warning(f"[PREDICTION] Could not convert base_name index '{base_index_raw}' to integer.")
if isinstance(base_index, int):
potential_base_names = set()
for stem in candidate_stems_list: # Iterate over the list
parts = stem.split(separator)
log.debug(f"[PREDICTION] Processing stem: '{stem}', Parts: {parts}") # DEBUG PREDICTION
if len(parts) > base_index:
extracted_name = parts[base_index]
potential_base_names.add(extracted_name)
log.debug(f"[PREDICTION] Extracted potential base name: '{extracted_name}' using index {base_index}") # DEBUG PREDICTION
else:
log.debug(f"[PREDICTION] Stem '{stem}' has too few parts ({len(parts)}) for index {base_index}.") # DEBUG PREDICTION
if len(potential_base_names) == 1:
determined_base_name = potential_base_names.pop()
log.debug(f"[PREDICTION] Determined base name '{determined_base_name}' from structured parts (index {base_index}).") # DEBUG PREDICTION
elif len(potential_base_names) > 1:
log.debug(f"[PREDICTION] Multiple potential base names found from index {base_index}: {potential_base_names}. Falling back to common prefix.") # DEBUG PREDICTION
determined_base_name = os.path.commonprefix(candidate_stems_list) # Use list here
determined_base_name = determined_base_name.strip(separator + ' _').rstrip(separator + ' _')
# else: Use common prefix below
if determined_base_name == "UnknownAssetName" or not determined_base_name:
log.debug("[PREDICTION] Falling back to common prefix for base name determination (structured parts failed or no index).") # DEBUG PREDICTION
determined_base_name = os.path.commonprefix(candidate_stems_list) # Use list here
determined_base_name = determined_base_name.strip(separator + ' _').rstrip(separator + ' _')
# 5. Sanitize Names
final_base_name = self._sanitize_filename(determined_base_name or "UnknownAssetName")
log.debug(f"[PREDICTION] Final determined base name for prediction: '{final_base_name}'") # DEBUG PREDICTION
final_supplier_name = self._sanitize_filename(supplier_name)
# 6. Predict Output Filenames
file_predictions = {}
target_pattern = self.config.target_filename_pattern
# Use highest resolution key as a placeholder for prediction
highest_res_key = "Res?" # Fallback
if self.config.image_resolutions:
highest_res_key = max(self.config.image_resolutions, key=self.config.image_resolutions.get)
for input_fname, map_type in potential_map_files.items():
# Assume PNG for prediction, extension might change based on bit depth rules later
# but this gives a good idea of the renaming.
# A more complex prediction could check bit depth rules.
predicted_ext = "png" # Simple assumption for preview
try:
predicted_fname = target_pattern.format(
base_name=final_base_name,
map_type=map_type,
resolution=highest_res_key, # Use placeholder resolution
ext=predicted_ext
)
file_predictions[input_fname] = predicted_fname
except KeyError as fmt_err:
log.warning(f"Prediction: Error formatting filename for {input_fname} (KeyError: {fmt_err}). Skipping file prediction.")
file_predictions[input_fname] = "[Filename Format Error]"
log.debug(f"Predicted structure: Supplier='{final_supplier_name}', Asset='{final_base_name}', Files={len(file_predictions)}")
return final_supplier_name, final_base_name, file_predictions
except Exception as e:
log.error(f"Error during output structure prediction for {self.input_path.name}: {e}", exc_info=True)
return None
# --- New Detailed Prediction Method ---
def get_detailed_file_predictions(self) -> list[dict] | None:
"""
Performs extraction and classification to provide a detailed list of all
files found within the input and their predicted status/output name,
handling multiple potential assets within the input.
Returns:
list[dict] | None: A list of dictionaries, each representing a file:
{'original_path': str,
'predicted_asset_name': str | None,
'predicted_output_name': str | None,
'status': str,
'details': str | None}
Returns None if a critical error occurs during setup/classification.
"""
log.info(f"Getting detailed file predictions for input: {self.input_path.name}")
results = []
all_files_in_workspace = [] # Keep track of all files found
try:
# --- Perform necessary setup and classification ---
self._setup_workspace()
self._extract_input()
# Run classification - this populates self.classified_files
self._inventory_and_classify_files()
# --- Determine distinct assets and file mapping ---
# This uses the results from _inventory_and_classify_files
distinct_base_names, file_to_base_name_map = self._determine_base_metadata()
log.debug(f"Prediction: Determined base names: {distinct_base_names}")
log.debug(f"Prediction: File to base name map: { {str(k):v for k,v in file_to_base_name_map.items()} }")
# --- Apply Suffixes for Prediction Preview ---
# This logic is similar to the main process method but applied to the classified_files list
log.debug("Prediction: Applying map type suffixes for preview...")
grouped_classified_maps = defaultdict(list)
for map_info in self.classified_files.get('maps', []):
# Group by the base map type
grouped_classified_maps[map_info['map_type']].append(map_info)
# Create a new list for maps with updated types for prediction
maps_with_predicted_types = []
for base_map_type, maps_in_group in grouped_classified_maps.items():
respect_variants = base_map_type in self.config.respect_variant_map_types
# Sort maps within the group for consistent suffixing (using the same key as in _inventory_and_classify_files)
maps_in_group.sort(key=lambda c: (
c.get('preset_rule_index', 9999),
c.get('keyword_index_in_rule', 9999) if 'keyword_index_in_rule' in c else 9999, # Handle potential missing key
str(c['source_path'])
))
for i, map_info in enumerate(maps_in_group):
predicted_map_type = f"{base_map_type}-{i + 1}" if respect_variants else base_map_type
# Create a copy to avoid modifying the original classified_files list in place
map_info_copy = map_info.copy()
map_info_copy['predicted_map_type'] = predicted_map_type # Store the predicted type
maps_with_predicted_types.append(map_info_copy)
# Replace the original maps list with the one containing predicted types for the next step
# Note: This is a temporary list for prediction generation, not modifying the instance's classified_files permanently
# self.classified_files["maps"] = maps_with_predicted_types # Avoid modifying instance state
# --- Prepare for filename prediction ---
target_pattern = self.config.target_filename_pattern
highest_res_key = "Res?" # Placeholder resolution for prediction
if self.config.image_resolutions:
highest_res_key = max(self.config.image_resolutions, key=self.config.image_resolutions.get)
# --- Process all classified files (including maps with predicted types) ---
all_classified_files_with_category = []
# Add maps with predicted types first
for map_info in maps_with_predicted_types:
map_info['category'] = 'maps' # Ensure category is set
all_classified_files_with_category.append(map_info)
if 'source_path' in map_info:
all_files_in_workspace.append(map_info['source_path'])
# Add other categories (models, extra, ignored)
for category in ['models', 'extra', 'ignored']:
for file_info in self.classified_files.get(category, []):
file_info['category'] = category
all_classified_files_with_category.append(file_info)
if 'source_path' in file_info:
all_files_in_workspace.append(file_info['source_path'])
# --- Generate results for each file ---
processed_paths = set() # Track paths already added to results
for file_info in all_classified_files_with_category:
original_path = file_info.get("source_path")
if not original_path or original_path in processed_paths:
continue # Skip if path missing or already processed
original_path_str = str(original_path)
processed_paths.add(original_path) # Mark as processed
# Determine predicted asset name and status
predicted_asset_name = file_to_base_name_map.get(original_path) # Can be None
category = file_info['category'] # maps, models, extra, ignored
reason = file_info.get('reason') # Specific reason for extra/ignored
status = "Unknown"
details = None
predicted_output_name = None # Usually original name, except for maps
if category == "maps":
status = "Mapped"
# Use the predicted_map_type for the preview display
map_type_for_preview = file_info.get("predicted_map_type", file_info.get("map_type", "UnknownType"))
details = f"[{map_type_for_preview}]"
if file_info.get("is_16bit_source"): details += " (16-bit)"
# Predict map output name using its determined asset name and predicted map type
if predicted_asset_name:
try:
predicted_ext = "png" # Assume PNG for prediction simplicity
predicted_output_name = target_pattern.format(
base_name=predicted_asset_name,
map_type=map_type_for_preview, # Use the predicted type here
resolution=highest_res_key,
ext=predicted_ext
)
except KeyError as fmt_err:
log.warning(f"Prediction format error for map {original_path_str}: {fmt_err}")
predicted_output_name = "[Format Error]"
details += f" (Format Key Error: {fmt_err})"
except Exception as pred_err:
log.warning(f"Prediction error for map {original_path_str}: {pred_err}")
predicted_output_name = "[Prediction Error]"
details += f" (Error: {pred_err})"
else:
# Should not happen for maps if _determine_base_metadata worked correctly
log.warning(f"Map file '{original_path_str}' has no predicted asset name.")
predicted_output_name = "[No Asset Name]"
elif category == "models":
status = "Model"
details = "[Model]"
predicted_output_name = original_path.name # Models keep original name
elif category == "ignored":
status = "Ignored"
details = f"Ignored ({reason or 'Unknown reason'})"
predicted_output_name = None # Ignored files have no output
elif category == "extra":
if predicted_asset_name is None:
# This is an "Unmatched Extra" file (includes Unrecognised and explicit Extras without a base name)
status = "Unmatched Extra"
details = f"[Unmatched Extra ({reason or 'N/A'})]" # Include original reason if available
elif reason == 'Unrecognised':
# Unrecognised but belongs to a specific asset
status = "Unrecognised"
details = "[Unrecognised]"
else:
# Explicitly matched an 'extra' pattern and belongs to an asset
status = "Extra"
details = f"Extra ({reason})"
predicted_output_name = original_path.name # Extra files keep original name
else:
log.warning(f"Unknown category '{category}' encountered during prediction for {original_path_str}")
status = "Error"
details = f"[Unknown Category: {category}]"
predicted_output_name = original_path.name
results.append({
"original_path": original_path_str,
"predicted_asset_name": predicted_asset_name, # May be None
"predicted_output_name": predicted_output_name,
"status": status,
"details": details
})
# Add any files found during walk but missed by classification (should be rare)
# These are likely unmatched as well.
for file_path in all_files_in_workspace:
if file_path not in processed_paths:
log.warning(f"File found in workspace but not classified: {file_path}. Adding as Unmatched Extra.")
results.append({
"original_path": str(file_path),
"predicted_asset_name": None, # Explicitly None as it wasn't mapped
"predicted_output_name": file_path.name,
"status": "Unmatched Extra",
"details": "[Missed Classification]"
})
log.info(f"Detailed prediction complete for input '{self.input_path.name}'. Found {len(results)} files.")
# Sort results by original path for consistent display
results.sort(key=lambda x: x.get("original_path", ""))
return results
except (AssetProcessingError, ConfigurationError, Exception) as e:
log.error(f"Critical error during detailed prediction for {self.input_path.name}: {e}", exc_info=True)
return None # Indicate critical failure
finally:
# Ensure cleanup always happens
self._cleanup_workspace()
# --- End of AssetProcessor Class ---