Pipeline simplification - Needs testing!

This commit is contained in:
2025-05-12 13:31:58 +02:00
parent 5bf53f036c
commit 4ffb2ff78c
7 changed files with 1046 additions and 1049 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -2,14 +2,12 @@ import logging
from pathlib import Path
from typing import Dict, Optional, List, Tuple
import numpy as np
import cv2 # For potential direct cv2 operations if ipu doesn't cover all merge needs
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
from rule_structure import FileRule
from utils.path_utils import sanitize_filename
from ...utils import image_processing_utils as ipu
logger = logging.getLogger(__name__)
@@ -34,48 +32,20 @@ class MapMergingStage(ProcessingStage):
if context.status_flags.get('skip_asset'):
logger.info(f"Skipping map merging for asset {asset_name_for_log} as skip_asset flag is set.")
return context
if not hasattr(context, 'merged_maps_details'):
context.merged_maps_details = {}
if not hasattr(context, 'merged_image_tasks'):
context.merged_image_tasks = []
if not hasattr(context, 'processed_maps_details'):
logger.warning(f"Asset {asset_name_for_log}: 'processed_maps_details' not found in context. Cannot perform map merging.")
logger.warning(f"Asset {asset_name_for_log}: 'processed_maps_details' not found in context. Cannot generate merge tasks.")
return context
if not context.files_to_process: # This list might not be relevant if merge rules are defined elsewhere or implicitly
logger.info(f"Asset {asset_name_for_log}: No files_to_process defined. This stage might rely on config or processed_maps_details directly for merge rules.")
# Depending on design, this might not be an error, so we don't return yet.
logger.info(f"Starting MapMergingStage for asset: {asset_name_for_log}")
# TODO: The logic for identifying merge rules and their inputs needs significant rework
# as FileRule no longer has 'id' or 'merge_settings' directly in the way this stage expects.
# Merge rules are likely defined in the main configuration (context.config_obj.map_merge_rules)
# and need to be matched against available maps in context.processed_maps_details.
# Placeholder for the loop that would iterate over context.config_obj.map_merge_rules
# For now, this stage will effectively do nothing until that logic is implemented.
# Example of how one might start to adapt:
# for configured_merge_rule in context.config_obj.map_merge_rules:
# output_map_type = configured_merge_rule.get('output_map_type')
# inputs_config = configured_merge_rule.get('inputs') # e.g. {"R": "NORMAL", "G": "ROUGHNESS"}
# # ... then find these input map_types in context.processed_maps_details ...
# # ... and perform the merge ...
# # This is a complex change beyond simple attribute renaming.
# The following is the original loop structure, which will likely fail due to missing attributes on FileRule.
# Keeping it commented out to show what was there.
"""
for merge_rule in context.files_to_process: # This iteration logic is likely incorrect for merge rules
if not isinstance(merge_rule, FileRule) or merge_rule.item_type != "MAP_MERGE":
continue
# FileRule does not have merge_settings or id.hex
# This entire block needs to be re-thought based on where merge rules are defined.
# Assuming merge_rule_id_hex would be a generated UUID for this operation.
merge_rule_id_hex = f"merge_op_{uuid.uuid4().hex[:8]}"
current_map_type = merge_rule.item_type_override or merge_rule.item_type
# The core merge rules are in context.config_obj.map_merge_rules
# Each rule in there defines an output_map_type and its inputs.
logger.error(f"Asset {asset_name_for_log}, Potential Merge for {current_map_type}: Merge rule processing needs rework. FileRule lacks 'merge_settings' and 'id'. Skipping this rule.")
context.merged_maps_details[merge_rule_id_hex] = {
@@ -84,7 +54,7 @@ class MapMergingStage(ProcessingStage):
'reason': 'Merge rule processing logic in MapMergingStage needs refactor due to FileRule changes.'
}
continue
"""
# For now, let's assume no merge rules are processed until the logic is fixed.
num_merge_rules_attempted = 0
@@ -115,24 +85,20 @@ class MapMergingStage(ProcessingStage):
merge_op_id = f"merge_{sanitize_filename(output_map_type)}_{rule_idx}"
logger.info(f"Asset {asset_name_for_log}: Processing configured merge rule for '{output_map_type}' (Op ID: {merge_op_id})")
loaded_input_maps: Dict[str, np.ndarray] = {} # Key: input_map_type (e.g. "NRM"), Value: image_data
input_map_paths: Dict[str, str] = {} # Key: input_map_type, Value: path_str
target_dims: Optional[Tuple[int, int]] = None
all_inputs_valid = True
# Find and load input maps from processed_maps_details
# This assumes one processed map per map_type. If multiple variants exist, this needs refinement.
input_map_sources_list = []
source_bit_depths_list = []
primary_source_dimensions = None
# Find required input maps from processed_maps_details
required_input_map_types = set(inputs_map_type_to_channel.values())
for required_map_type in required_input_map_types:
found_processed_map_details = None
# The key `p_key_idx` is the file_rule_idx from the IndividualMapProcessingStage
for p_key_idx, p_details in context.processed_maps_details.items(): # p_key_idx is an int
# Iterate through processed_maps_details to find the required map type
for p_key_idx, p_details in context.processed_maps_details.items():
processed_map_identifier = p_details.get('processing_map_type', p_details.get('map_type'))
# Comprehensive list of valid statuses for an input map to be used in merging
valid_input_statuses = ['BasePOTSaved', 'Processed_With_Variants', 'Processed_No_Variants', 'Converted_To_Rough']
# Check for a match, considering both "MAP_TYPE" and "TYPE" formats
is_match = False
if processed_map_identifier == required_map_type:
is_match = True
@@ -141,207 +107,56 @@ class MapMergingStage(ProcessingStage):
elif not required_map_type.startswith("MAP_") and processed_map_identifier == f"MAP_{required_map_type}":
is_match = True
if is_match and p_details.get('status') in valid_input_statuses:
found_processed_map_details = p_details
# The key `p_key_idx` (which is the FileRule index) is implicitly associated with these details.
break
if not found_processed_map_details:
can_be_fully_defaulted = True
channels_requiring_this_map = [
ch_key for ch_key, map_type_val in inputs_map_type_to_channel.items()
if map_type_val == required_map_type
]
# Check if the found map is in a usable status and has a temporary file
valid_input_statuses = ['BasePOTSaved', 'Processed_With_Variants', 'Processed_No_Variants', 'Converted_To_Rough'] # Add other relevant statuses if needed
if is_match and p_details.get('status') in valid_input_statuses and p_details.get('temp_processed_file'):
# Also check if the temp file actually exists on disk
if Path(p_details.get('temp_processed_file')).exists():
found_processed_map_details = p_details
break # Found a suitable input, move to the next required map type
if not channels_requiring_this_map:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Internal logic error. Required map_type '{required_map_type}' is not actually used by any output channel. Configuration: {inputs_map_type_to_channel}")
all_inputs_valid = False
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Internal error: required map_type '{required_map_type}' not in use."}
break
for channel_char_needing_default in channels_requiring_this_map:
if default_values.get(channel_char_needing_default) is None:
can_be_fully_defaulted = False
break
if can_be_fully_defaulted:
logger.info(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Required input map_type '{required_map_type}' for output '{output_map_type}' not found or not in usable state. Will attempt to use default values for its channels: {channels_requiring_this_map}.")
else:
logger.warning(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Required input map_type '{required_map_type}' for output '{output_map_type}' not found/unusable, AND not all its required channels ({channels_requiring_this_map}) have defaults. Failing merge op.")
all_inputs_valid = False
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Input '{required_map_type}' missing and defaults incomplete."}
break
if found_processed_map_details:
temp_file_path_str = found_processed_map_details.get('temp_processed_file')
if not temp_file_path_str:
# Log with p_key_idx if available, or just the map type if not (though it should be if found_processed_map_details is set)
log_key_info = f"(Associated Key Index: {p_key_idx})" if 'p_key_idx' in locals() and found_processed_map_details else "" # Use locals() to check if p_key_idx is defined in this scope
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: 'temp_processed_file' missing in details for found map_type '{required_map_type}' {log_key_info}.")
all_inputs_valid = False
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Temp file path missing for input '{required_map_type}'."}
break
temp_file_path = Path(temp_file_path_str)
if not temp_file_path.exists():
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Temp file {temp_file_path} for input map_type '{required_map_type}' does not exist.")
all_inputs_valid = False
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Temp file for input '{required_map_type}' missing."}
break
file_path = found_processed_map_details.get('temp_processed_file')
dimensions = found_processed_map_details.get('base_pot_dimensions')
try:
image_data = ipu.load_image(str(temp_file_path))
if image_data is None: raise ValueError("Loaded image is None")
except Exception as e:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Error loading image {temp_file_path} for input map_type '{required_map_type}': {e}")
all_inputs_valid = False
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Error loading input '{required_map_type}'."}
break
loaded_input_maps[required_map_type] = image_data
input_map_paths[required_map_type] = str(temp_file_path)
# Attempt to get original_bit_depth, log warning if not found
original_bit_depth = found_processed_map_details.get('original_bit_depth')
if original_bit_depth is None:
logger.warning(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: 'original_bit_depth' not found in processed_maps_details for map type '{required_map_type}'. This value is pending IndividualMapProcessingStage refactoring and will be None or a default for now.")
current_dims = (image_data.shape[1], image_data.shape[0])
if target_dims is None:
target_dims = current_dims
elif current_dims != target_dims:
logger.warning(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Input map '{required_map_type}' dims {current_dims} differ from target {target_dims}. Resizing.")
try:
image_data_resized = ipu.resize_image(image_data, target_dims[0], target_dims[1])
if image_data_resized is None: raise ValueError("Resize returned None")
loaded_input_maps[required_map_type] = image_data_resized
except Exception as e:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Failed to resize '{required_map_type}': {e}")
all_inputs_valid = False
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Failed to resize input '{required_map_type}'."}
break
if not all_inputs_valid:
logger.warning(f"Asset {asset_name_for_log}: Skipping merge for Op ID {merge_op_id} ('{output_map_type}') due to invalid inputs.")
continue
input_map_sources_list.append({
'map_type': required_map_type,
'file_path': file_path,
'dimensions': dimensions,
'original_bit_depth': original_bit_depth
})
source_bit_depths_list.append(original_bit_depth)
if not loaded_input_maps and not any(default_values.get(ch) is not None for ch in inputs_map_type_to_channel.keys()):
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: No input maps loaded and no defaults available for any channel for '{output_map_type}'. Cannot proceed.")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': 'No input maps loaded and no defaults available.'}
continue
if target_dims is None:
default_res_key = context.config_obj.get("default_output_resolution_key_for_merge", "1K")
image_resolutions_cfg = getattr(context.config_obj, "image_resolutions", {})
default_max_dim = image_resolutions_cfg.get(default_res_key)
if default_max_dim:
target_dims = (default_max_dim, default_max_dim)
logger.info(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Target dimensions not set by inputs (all defaulted). Using configured default resolution '{default_res_key}': {target_dims}.")
# Set primary_source_dimensions from the first valid input found
if primary_source_dimensions is None and dimensions:
primary_source_dimensions = dimensions
else:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Target dimensions could not be determined for '{output_map_type}' (all inputs defaulted and no default output resolution configured).")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': 'Target dimensions undetermined for fully defaulted merge.'}
continue
output_channel_keys = sorted(list(inputs_map_type_to_channel.keys()))
num_output_channels = len(output_channel_keys)
if num_output_channels == 0:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: No output channels defined in 'inputs' for '{output_map_type}'.")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': 'No output channels defined.'}
continue
try:
output_dtype = np.uint8
if num_output_channels == 1:
merged_image = np.zeros((target_dims[1], target_dims[0]), dtype=output_dtype)
else:
merged_image = np.zeros((target_dims[1], target_dims[0], num_output_channels), dtype=output_dtype)
except Exception as e:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Error creating empty merged image for '{output_map_type}': {e}")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f'Error creating output canvas: {e}'}
continue
merge_op_failed_detail = False
for i, out_channel_char in enumerate(output_channel_keys):
input_map_type_for_this_channel = inputs_map_type_to_channel[out_channel_char]
source_image = loaded_input_maps.get(input_map_type_for_this_channel)
source_data_this_channel = None
if source_image is not None:
if source_image.dtype != np.uint8:
logger.warning(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Input map '{input_map_type_for_this_channel}' has dtype {source_image.dtype}, expected uint8. Attempting conversion.")
source_image = ipu.convert_to_uint8(source_image)
if source_image is None:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Failed to convert input '{input_map_type_for_this_channel}' to uint8.")
merge_op_failed_detail = True; break
# If a required map is not found, log a warning but don't fail the task generation.
# The consuming stage will handle missing inputs and fallbacks.
logger.warning(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Required input map type '{required_map_type}' not found or not in a usable state in context.processed_maps_details. This input will be skipped for task generation.")
if source_image.ndim == 2:
source_data_this_channel = source_image
elif source_image.ndim == 3:
semantic_to_bgr_idx = {'R': 2, 'G': 1, 'B': 0, 'A': 3}
idx_to_extract = semantic_to_bgr_idx.get(out_channel_char.upper())
if idx_to_extract is not None and idx_to_extract < source_image.shape[2]:
source_data_this_channel = source_image[:, :, idx_to_extract]
logger.debug(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: For output '{out_channel_char}', using source '{input_map_type_for_this_channel}' semantic '{out_channel_char}' (BGR(A) index {idx_to_extract}).")
else:
logger.warning(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Could not map output '{out_channel_char}' to a specific BGR(A) channel of '{input_map_type_for_this_channel}' (shape {source_image.shape}). Defaulting to its channel 0 (Blue).")
source_data_this_channel = source_image[:, :, 0]
else:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Source image '{input_map_type_for_this_channel}' has unexpected dimensions: {source_image.ndim} (shape {source_image.shape}).")
merge_op_failed_detail = True; break
else:
default_val_for_channel = default_values.get(out_channel_char)
if default_val_for_channel is not None:
try:
scaled_default_val = int(float(default_val_for_channel) * 255)
source_data_this_channel = np.full((target_dims[1], target_dims[0]), scaled_default_val, dtype=np.uint8)
logger.info(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Using default value {default_val_for_channel} (scaled to {scaled_default_val}) for output channel '{out_channel_char}' as input map '{input_map_type_for_this_channel}' was missing.")
except ValueError:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Default value '{default_val_for_channel}' for channel '{out_channel_char}' is not a valid float. Cannot scale.")
merge_op_failed_detail = True; break
else:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Input map '{input_map_type_for_this_channel}' for output channel '{out_channel_char}' is missing and no default value provided.")
merge_op_failed_detail = True; break
if source_data_this_channel is None:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Failed to get source data for output channel '{out_channel_char}'.")
merge_op_failed_detail = True; break
try:
if merged_image.ndim == 2:
merged_image = source_data_this_channel
else:
merged_image[:, :, i] = source_data_this_channel
except Exception as e:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Error assigning data to output channel '{out_channel_char}' (index {i}): {e}. Merged shape: {merged_image.shape}, Source data shape: {source_data_this_channel.shape}")
merge_op_failed_detail = True; break
if merge_op_failed_detail:
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': 'Error during channel assignment.'}
continue
output_format = 'png'
temp_merged_filename = f"merged_{sanitize_filename(output_map_type)}_{merge_op_id}.{output_format}"
temp_merged_path = context.engine_temp_dir / temp_merged_filename
try:
save_success = ipu.save_image(str(temp_merged_path), merged_image)
if not save_success: raise ValueError("Save image returned false")
except Exception as e:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Error saving merged image {temp_merged_path}: {e}")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f'Failed to save merged image: {e}'}
continue
logger.info(f"Asset {asset_name_for_log}: Successfully merged and saved '{output_map_type}' (Op ID: {merge_op_id}) to {temp_merged_path}")
context.merged_maps_details[merge_op_id] = {
'map_type': output_map_type,
'temp_merged_file': str(temp_merged_path),
'input_map_types_used': list(inputs_map_type_to_channel.values()),
'input_map_files_used': input_map_paths,
'merged_dimensions': target_dims,
'status': 'Processed'
# Create the merged image task dictionary
merged_task = {
'output_map_type': output_map_type,
'input_map_sources': input_map_sources_list,
'merge_rule_config': configured_merge_rule,
'source_dimensions': primary_source_dimensions, # Can be None if no inputs were found
'source_bit_depths': source_bit_depths_list
}
logger.info(f"Finished MapMergingStage for asset: {asset_name_for_log}. Merged maps operations attempted: {num_merge_rules_attempted}, Succeeded: {len([d for d in context.merged_maps_details.values() if d.get('status') == 'Processed'])}")
# Append the task to the context
context.merged_image_tasks.append(merged_task)
logger.info(f"Asset {asset_name_for_log}: Generated merge task for '{output_map_type}' (Op ID: {merge_op_id}). Task details: {merged_task}")
# Note: We no longer populate context.merged_maps_details with 'Processed' status here,
# as this stage only generates tasks, it doesn't perform the merge or save files.
# The merged_maps_details will be populated by the stage that consumes these tasks.
logger.info(f"Finished MapMergingStage for asset: {asset_name_for_log}. Merge tasks generated: {len(context.merged_image_tasks)}")
return context

View File

@@ -163,6 +163,37 @@ def calculate_target_dimensions(
# --- Image Statistics ---
def get_image_bit_depth(image_path_str: str) -> Optional[int]:
"""
Determines the bit depth of an image file.
"""
try:
# Use IMREAD_UNCHANGED to preserve original bit depth
img = cv2.imread(image_path_str, cv2.IMREAD_UNCHANGED)
if img is None:
# logger.error(f"Failed to read image for bit depth: {image_path_str}") # Use print for utils
print(f"Warning: Failed to read image for bit depth: {image_path_str}")
return None
dtype_to_bit_depth = {
np.dtype('uint8'): 8,
np.dtype('uint16'): 16,
np.dtype('float32'): 32, # Typically for EXR etc.
np.dtype('int8'): 8, # Unlikely for images but good to have
np.dtype('int16'): 16, # Unlikely
# Add other dtypes if necessary
}
bit_depth = dtype_to_bit_depth.get(img.dtype)
if bit_depth is None:
# logger.warning(f"Unknown dtype {img.dtype} for image {image_path_str}, cannot determine bit depth.") # Use print for utils
print(f"Warning: Unknown dtype {img.dtype} for image {image_path_str}, cannot determine bit depth.")
pass # Return None
return bit_depth
except Exception as e:
# logger.error(f"Error getting bit depth for {image_path_str}: {e}") # Use print for utils
print(f"Error getting bit depth for {image_path_str}: {e}")
return None
def calculate_image_stats(image_data: np.ndarray) -> Optional[Dict]:
"""
Calculates min, max, mean for a given numpy image array.

View File

@@ -0,0 +1,250 @@
import logging
import cv2
import numpy as np
from pathlib import Path
from typing import List, Dict, Any, Tuple, Optional
# Potentially import ipu from ...utils import image_processing_utils as ipu
# Assuming ipu is available in the same utils directory or parent
try:
from . import image_processing_utils as ipu
except ImportError:
# Fallback for different import structures if needed, adjust based on actual project structure
# For this project structure, the relative import should work.
logging.warning("Could not import image_processing_utils using relative path. Attempting absolute import.")
try:
from processing.utils import image_processing_utils as ipu
except ImportError:
logging.error("Could not import image_processing_utils.")
ipu = None # Handle case where ipu is not available
logger = logging.getLogger(__name__)
def save_image_variants(
source_image_data: np.ndarray,
base_map_type: str, # Filename-friendly map type
source_bit_depth_info: List[Optional[int]],
image_resolutions: Dict[str, int],
file_type_defs: Dict[str, Dict[str, Any]],
output_format_8bit: str,
output_format_16bit_primary: str,
output_format_16bit_fallback: str,
png_compression_level: int,
jpg_quality: int,
output_filename_pattern_tokens: Dict[str, Any], # Must include 'output_base_directory': Path and 'asset_name': str
output_filename_pattern: str,
# Consider adding ipu or relevant parts of it if not importing globally
) -> List[Dict[str, Any]]:
"""
Centralizes image saving logic, generating and saving various resolution variants
according to configuration.
Args:
source_image_data (np.ndarray): High-res image data (in memory, potentially transformed).
base_map_type (str): Final map type (e.g., "COL", "ROUGH", "NORMAL", "MAP_NRMRGH").
This is the filename-friendly map type.
source_bit_depth_info (List[Optional[int]]): List of original source bit depth(s)
(e.g., [8], [16], [8, 16]). Can contain None.
image_resolutions (Dict[str, int]): Dictionary mapping resolution keys (e.g., "4K")
to max dimensions (e.g., 4096).
file_type_defs (Dict[str, Dict[str, Any]]): Dictionary defining properties for map types,
including 'bit_depth_rule'.
output_format_8bit (str): File extension for 8-bit output (e.g., "jpg", "png").
output_format_16bit_primary (str): Primary file extension for 16-bit output (e.g., "png", "tif").
output_format_16bit_fallback (str): Fallback file extension for 16-bit output.
png_compression_level (int): Compression level for PNG output (0-9).
jpg_quality (int): Quality level for JPG output (0-100).
output_filename_pattern_tokens (Dict[str, Any]): Dictionary of tokens for filename
pattern replacement. Must include
'output_base_directory' (Path) and
'asset_name' (str).
output_filename_pattern (str): Pattern string for generating output filenames
(e.g., "[assetname]_[maptype]_[resolution].[ext]").
Returns:
List[Dict[str, Any]]: A list of dictionaries, each containing details about a saved file.
Example: [{'path': str, 'resolution_key': str, 'format': str,
'bit_depth': int, 'dimensions': (w,h)}, ...]
"""
if ipu is None:
logger.error("image_processing_utils is not available. Cannot save images.")
return []
saved_file_details = []
source_h, source_w = source_image_data.shape[:2]
source_max_dim = max(source_h, source_w)
# 1. Use provided configuration inputs (already available as function arguments)
logger.info(f"Saving variants for map type: {base_map_type}")
# 2. Determine Target Bit Depth
target_bit_depth = 8 # Default
bit_depth_rule = file_type_defs.get(base_map_type, {}).get('bit_depth_rule', 'force_8bit')
if bit_depth_rule not in ['force_8bit', 'respect_inputs']:
logger.warning(f"Unknown bit_depth_rule '{bit_depth_rule}' for map type '{base_map_type}'. Defaulting to 'force_8bit'.")
bit_depth_rule = 'force_8bit'
if bit_depth_rule == 'respect_inputs':
# Check if any source bit depth is > 8, ignoring None
if any(depth is not None and depth > 8 for depth in source_bit_depth_info):
target_bit_depth = 16
else:
target_bit_depth = 8
logger.info(f"Bit depth rule 'respect_inputs' applied. Source bit depths: {source_bit_depth_info}. Target bit depth: {target_bit_depth}")
else: # force_8bit
target_bit_depth = 8
logger.info(f"Bit depth rule 'force_8bit' applied. Target bit depth: {target_bit_depth}")
# 3. Determine Output File Format(s)
if target_bit_depth == 8:
output_ext = output_format_8bit.lstrip('.').lower()
elif target_bit_depth == 16:
# Prioritize primary, fallback to fallback if primary is not supported/desired
# For now, just use primary. More complex logic might be needed later.
output_ext = output_format_16bit_primary.lstrip('.').lower()
# Basic fallback logic example (can be expanded)
if output_ext not in ['png', 'tif']: # Assuming common 16-bit formats
output_ext = output_format_16bit_fallback.lstrip('.').lower()
logger.warning(f"Primary 16-bit format '{output_format_16bit_primary}' might not be suitable. Using fallback '{output_format_16bit_fallback}'.")
else:
logger.error(f"Unsupported target bit depth: {target_bit_depth}. Defaulting to 8-bit format.")
output_ext = output_format_8bit.lstrip('.').lower()
logger.info(f"Target bit depth: {target_bit_depth}, Output format: {output_ext}")
# 4. Generate and Save Resolution Variants
# Sort resolutions by max dimension descending
sorted_resolutions = sorted(image_resolutions.items(), key=lambda item: item[1], reverse=True)
for res_key, res_max_dim in sorted_resolutions:
logger.info(f"Processing resolution variant: {res_key} ({res_max_dim} max dim)")
# Calculate target dimensions, ensuring no upscaling
if source_max_dim <= res_max_dim:
# If source is smaller or equal, use source dimensions
target_w_res, target_h_res = source_w, source_h
if source_max_dim < res_max_dim:
logger.info(f"Source image ({source_w}x{source_h}) is smaller than target resolution {res_key} ({res_max_dim}). Saving at source resolution.")
else:
# Downscale, maintaining aspect ratio
aspect_ratio = source_w / source_h
if source_w > source_h:
target_w_res = res_max_dim
target_h_res = int(res_max_dim / aspect_ratio)
else:
target_h_res = res_max_dim
target_w_res = int(res_max_dim * aspect_ratio)
logger.info(f"Resizing source image ({source_w}x{source_h}) to {target_w_res}x{target_h_res} for {res_key} variant.")
# Resize source_image_data
# Use INTER_AREA for downscaling, INTER_LINEAR or INTER_CUBIC for upscaling (though we avoid upscaling here)
interpolation_method = cv2.INTER_AREA # Good for downscaling
# If we were allowing upscaling, we might add logic like:
# if target_w_res > source_w or target_h_res > source_h:
# interpolation_method = cv2.INTER_LINEAR # Or INTER_CUBIC
try:
variant_data = ipu.resize_image(source_image_data, (target_w_res, target_h_res), interpolation=interpolation_method)
logger.debug(f"Resized variant data shape: {variant_data.shape}")
except Exception as e:
logger.error(f"Error resizing image for {res_key} variant: {e}")
continue # Skip this variant if resizing fails
# Filename Construction
current_tokens = output_filename_pattern_tokens.copy()
current_tokens['maptype'] = base_map_type
current_tokens['resolution'] = res_key
current_tokens['ext'] = output_ext
try:
# Replace placeholders in the pattern
filename = output_filename_pattern
for token, value in current_tokens.items():
# Ensure value is string for replacement, handle Path objects later
filename = filename.replace(f"[{token}]", str(value))
# Construct full output path
output_base_directory = current_tokens.get('output_base_directory')
if not isinstance(output_base_directory, Path):
logger.error(f"'output_base_directory' token is missing or not a Path object: {output_base_directory}. Cannot save file.")
continue # Skip this variant
output_path = output_base_directory / filename
logger.info(f"Constructed output path: {output_path}")
# Ensure parent directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
logger.debug(f"Ensured directory exists: {output_path.parent}")
except Exception as e:
logger.error(f"Error constructing filepath for {res_key} variant: {e}")
continue # Skip this variant if path construction fails
# Prepare Save Parameters
save_params_cv2 = []
if output_ext == 'jpg':
save_params_cv2.append(cv2.IMWRITE_JPEG_QUALITY)
save_params_cv2.append(jpg_quality)
logger.debug(f"Using JPG quality: {jpg_quality}")
elif output_ext == 'png':
save_params_cv2.append(cv2.IMWRITE_PNG_COMPRESSION)
save_params_cv2.append(png_compression_level)
logger.debug(f"Using PNG compression level: {png_compression_level}")
# Add other format specific parameters if needed (e.g., TIFF compression)
# Bit Depth Conversion (just before saving)
image_data_for_save = variant_data
try:
if target_bit_depth == 8:
image_data_for_save = ipu.convert_to_uint8(variant_data)
logger.debug("Converted variant data to uint8.")
elif target_bit_depth == 16:
# ipu.convert_to_uint16 might handle different input types (float, uint8)
# Assuming variant_data might be float after resizing, convert to uint16
image_data_for_save = ipu.convert_to_uint16(variant_data)
logger.debug("Converted variant data to uint16.")
# Add other bit depth conversions if needed
except Exception as e:
logger.error(f"Error converting image data to target bit depth {target_bit_depth} for {res_key} variant: {e}")
continue # Skip this variant if conversion fails
# Saving
try:
# ipu.save_image is expected to handle the actual cv2.imwrite call
success = ipu.save_image(str(output_path), image_data_for_save, params=save_params_cv2)
if success:
logger.info(f"Successfully saved {res_key} variant to {output_path}")
# Collect details for the returned list
saved_file_details.append({
'path': str(output_path),
'resolution_key': res_key,
'format': output_ext,
'bit_depth': target_bit_depth,
'dimensions': (target_w_res, target_h_res)
})
else:
logger.error(f"Failed to save {res_key} variant to {output_path}")
except Exception as e:
logger.error(f"Error saving image for {res_key} variant to {output_path}: {e}")
# Continue to next variant even if one fails
# Discard in-memory variant after saving (Python's garbage collection handles this)
del variant_data
del image_data_for_save
# 5. Return List of Saved File Details
logger.info(f"Finished saving variants for map type: {base_map_type}. Saved {len(saved_file_details)} variants.")
return saved_file_details
# Optional Helper Functions (can be added here if needed)
# def _determine_target_bit_depth(...): ...
# def _determine_output_format(...): ...
# def _construct_variant_filepath(...): ...