Prototype > PreAlpha #67

Merged
Rusfort merged 54 commits from Dev into Stable 2025-05-15 09:10:54 +02:00
18 changed files with 857 additions and 677 deletions
Showing only changes of commit deeb1595fd - Show all commits

21
main.py
View File

@ -25,18 +25,39 @@ from PySide6.QtWidgets import QApplication
import sys
import os
sys.path.append(os.path.dirname(__file__))
print(f"DEBUG: sys.path after append: {sys.path}")
try:
print("DEBUG: Attempting to import Configuration...")
from configuration import Configuration, ConfigurationError
print("DEBUG: Successfully imported Configuration.")
print("DEBUG: Attempting to import ProcessingEngine...")
from processing_engine import ProcessingEngine
print("DEBUG: Successfully imported ProcessingEngine.")
print("DEBUG: Attempting to import SourceRule...")
from rule_structure import SourceRule
print("DEBUG: Successfully imported SourceRule.")
print("DEBUG: Attempting to import MainWindow...")
from gui.main_window import MainWindow
print("DEBUG: Successfully imported MainWindow.")
print("DEBUG: Attempting to import prepare_processing_workspace...")
from utils.workspace_utils import prepare_processing_workspace
print("DEBUG: Successfully imported prepare_processing_workspace.")
except ImportError as e:
script_dir = Path(__file__).parent.resolve()
print(f"ERROR: Cannot import Configuration or rule_structure classes.")
print(f"Ensure configuration.py and rule_structure.py are in the same directory or Python path.")
print(f"ERROR: Failed to import necessary classes: {e}")
print(f"DEBUG: Exception type: {type(e)}")
print(f"DEBUG: Exception args: {e.args}")
import traceback
print("DEBUG: Full traceback of the ImportError:")
traceback.print_exc()
print(f"Ensure 'configuration.py' and 'asset_processor.py' exist in the directory:")
print(f" {script_dir}")
print("Or that the directory is included in your PYTHONPATH.")

View File

@ -61,15 +61,13 @@ class PipelineOrchestrator:
# Create a temporary directory for this processing run if needed by any stage
# This temp dir is for the entire source_rule processing, not per asset.
# Individual stages might create their own sub-temp dirs if necessary.
temp_dir_path_str = tempfile.mkdtemp(
prefix="asset_processor_orchestrator_temp_", dir=self.config_obj.get_temp_directory_base()
)
temp_dir_path_str = tempfile.mkdtemp(prefix=self.config_obj.temp_dir_prefix)
engine_temp_dir_path = Path(temp_dir_path_str)
log.debug(f"PipelineOrchestrator created temporary directory: {engine_temp_dir_path}")
log.debug(f"PipelineOrchestrator created temporary directory: {engine_temp_dir_path} using prefix '{self.config_obj.temp_dir_prefix}'")
for asset_rule in source_rule.assets:
log.debug(f"Orchestrator: Processing asset '{asset_rule.name}'")
log.debug(f"Orchestrator: Processing asset '{asset_rule.asset_name}'")
context = AssetProcessingContext(
source_rule=source_rule,
asset_rule=asset_rule,
@ -85,43 +83,43 @@ class PipelineOrchestrator:
config_obj=self.config_obj,
status_flags={"skip_asset": False, "asset_failed": False}, # Initialize common flags
incrementing_value=incrementing_value,
sha256_value=sha5_value # Parameter name in context is sha256_value
sha5_value=sha5_value
)
for stage_idx, stage in enumerate(self.stages):
log.debug(f"Asset '{asset_rule.name}': Executing stage {stage_idx + 1}/{len(self.stages)}: {stage.__class__.__name__}")
log.debug(f"Asset '{asset_rule.asset_name}': Executing stage {stage_idx + 1}/{len(self.stages)}: {stage.__class__.__name__}")
try:
context = stage.execute(context)
except Exception as e:
log.error(f"Asset '{asset_rule.name}': Error during stage '{stage.__class__.__name__}': {e}", exc_info=True)
log.error(f"Asset '{asset_rule.asset_name}': Error during stage '{stage.__class__.__name__}': {e}", exc_info=True)
context.status_flags["asset_failed"] = True
context.asset_metadata["status"] = f"Failed: Error in stage {stage.__class__.__name__}"
context.asset_metadata["error_message"] = str(e)
break # Stop processing stages for this asset on error
if context.status_flags.get("skip_asset"):
log.info(f"Asset '{asset_rule.name}': Skipped by stage '{stage.__class__.__name__}'. Reason: {context.status_flags.get('skip_reason', 'N/A')}")
log.info(f"Asset '{asset_rule.asset_name}': Skipped by stage '{stage.__class__.__name__}'. Reason: {context.status_flags.get('skip_reason', 'N/A')}")
break # Skip remaining stages for this asset
# Refined status collection
if context.status_flags.get('skip_asset'):
overall_status["skipped"].append(asset_rule.name)
overall_status["skipped"].append(asset_rule.asset_name)
elif context.status_flags.get('asset_failed') or str(context.asset_metadata.get('status', '')).startswith("Failed"):
overall_status["failed"].append(asset_rule.name)
overall_status["failed"].append(asset_rule.asset_name)
elif context.asset_metadata.get('status') == "Processed":
overall_status["processed"].append(asset_rule.name)
overall_status["processed"].append(asset_rule.asset_name)
else: # Default or unknown state
log.warning(f"Asset '{asset_rule.name}': Unknown status after pipeline execution. Metadata status: '{context.asset_metadata.get('status')}'. Marking as failed.")
overall_status["failed"].append(f"{asset_rule.name} (Unknown Status: {context.asset_metadata.get('status')})")
log.debug(f"Asset '{asset_rule.name}' final status: {context.asset_metadata.get('status', 'N/A')}, Flags: {context.status_flags}")
log.warning(f"Asset '{asset_rule.asset_name}': Unknown status after pipeline execution. Metadata status: '{context.asset_metadata.get('status')}'. Marking as failed.")
overall_status["failed"].append(f"{asset_rule.asset_name} (Unknown Status: {context.asset_metadata.get('status')})")
log.debug(f"Asset '{asset_rule.asset_name}' final status: {context.asset_metadata.get('status', 'N/A')}, Flags: {context.status_flags}")
except Exception as e:
log.error(f"PipelineOrchestrator.process_source_rule failed: {e}", exc_info=True)
# Mark all remaining assets as failed if a top-level error occurs
processed_or_skipped_or_failed = set(overall_status["processed"] + overall_status["skipped"] + overall_status["failed"])
for asset_rule in source_rule.assets:
if asset_rule.name not in processed_or_skipped_or_failed:
overall_status["failed"].append(f"{asset_rule.name} (Orchestrator Error)")
if asset_rule.asset_name not in processed_or_skipped_or_failed:
overall_status["failed"].append(f"{asset_rule.asset_name} (Orchestrator Error)")
finally:
if engine_temp_dir_path and engine_temp_dir_path.exists():
try:

View File

@ -8,8 +8,8 @@ import numpy as np
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
from ...utils import image_processing_utils as ipu
from .....rule_structure import FileRule, TransformSettings
from .....utils.path_utils import sanitize_filename
from rule_structure import FileRule
from utils.path_utils import sanitize_filename
logger = logging.getLogger(__name__)
@ -21,31 +21,34 @@ class AlphaExtractionToMaskStage(ProcessingStage):
SUITABLE_SOURCE_MAP_TYPES = ["ALBEDO", "DIFFUSE", "BASE_COLOR"] # Map types likely to have alpha
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
logger.debug(f"Asset '{context.asset_rule.name}': Running AlphaExtractionToMaskStage.")
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
logger.debug(f"Asset '{asset_name_for_log}': Running AlphaExtractionToMaskStage.")
if context.status_flags.get('skip_asset'):
logger.debug(f"Asset '{context.asset_rule.name}': Skipping due to 'skip_asset' flag.")
logger.debug(f"Asset '{asset_name_for_log}': Skipping due to 'skip_asset' flag.")
return context
if not context.files_to_process or not context.processed_maps_details:
logger.debug(
f"Asset '{context.asset_rule.name}': Skipping alpha extraction - "
f"Asset '{asset_name_for_log}': Skipping alpha extraction - "
f"no files to process or no processed map details."
)
return context
# A. Check for Existing MASK Map
for file_rule in context.files_to_process:
if file_rule.map_type == "MASK":
# Assuming file_rule has 'map_type' and 'file_path' (instead of filename_pattern)
if hasattr(file_rule, 'map_type') and file_rule.map_type == "MASK":
file_path_for_log = file_rule.file_path if hasattr(file_rule, 'file_path') else "Unknown file path"
logger.info(
f"Asset '{context.asset_rule.name}': MASK map already defined by FileRule "
f"'{file_rule.filename_pattern}'. Skipping alpha extraction."
f"Asset '{asset_name_for_log}': MASK map already defined by FileRule "
f"for '{file_path_for_log}'. Skipping alpha extraction."
)
return context
# B. Find Suitable Source Map with Alpha
source_map_details_for_alpha: Optional[Dict] = None
source_file_rule_id_for_alpha: Optional[str] = None
source_file_rule_id_for_alpha: Optional[str] = None # This ID comes from processed_maps_details keys
for file_rule_id, details in context.processed_maps_details.items():
if details.get('status') == 'Processed' and \
@ -54,33 +57,31 @@ class AlphaExtractionToMaskStage(ProcessingStage):
temp_path = Path(details['temp_processed_file'])
if not temp_path.exists():
logger.warning(
f"Asset '{context.asset_rule.name}': Temp file {temp_path} for map "
f"Asset '{asset_name_for_log}': Temp file {temp_path} for map "
f"{details['map_type']} (ID: {file_rule_id}) does not exist. Cannot check for alpha."
)
continue
# Load image header or minimal data to check for alpha if possible,
# otherwise load full image. ipu.load_image should handle this.
image_data = ipu.load_image(temp_path)
image_data = ipu.load_image(temp_path)
if image_data is not None and image_data.ndim == 3 and image_data.shape[2] == 4:
source_map_details_for_alpha = details
source_file_rule_id_for_alpha = file_rule_id
logger.info(
f"Asset '{context.asset_rule.name}': Found potential source for alpha extraction: "
f"Asset '{asset_name_for_log}': Found potential source for alpha extraction: "
f"{temp_path} (MapType: {details['map_type']})"
)
break
break
except Exception as e:
logger.warning(
f"Asset '{context.asset_rule.name}': Error checking alpha for {details.get('temp_processed_file', 'N/A')}: {e}"
f"Asset '{asset_name_for_log}': Error checking alpha for {details.get('temp_processed_file', 'N/A')}: {e}"
)
continue
if source_map_details_for_alpha is None or source_file_rule_id_for_alpha is None:
logger.info(
f"Asset '{context.asset_rule.name}': No suitable source map with alpha channel found "
f"Asset '{asset_name_for_log}': No suitable source map with alpha channel found "
f"for MASK extraction."
)
return context
@ -91,7 +92,7 @@ class AlphaExtractionToMaskStage(ProcessingStage):
if full_image_data is None or not (full_image_data.ndim == 3 and full_image_data.shape[2] == 4):
logger.error(
f"Asset '{context.asset_rule.name}': Failed to reload or verify alpha channel from "
f"Asset '{asset_name_for_log}': Failed to reload or verify alpha channel from "
f"{source_image_path} for MASK extraction."
)
return context
@ -99,15 +100,13 @@ class AlphaExtractionToMaskStage(ProcessingStage):
alpha_channel: np.ndarray = full_image_data[:, :, 3] # Extract alpha (0-255)
# D. Save New Temporary MASK Map
# Ensure the mask is a 2D grayscale image. If ipu.save_image expects 3 channels for grayscale, adapt.
# Assuming ipu.save_image can handle a 2D numpy array for a grayscale image.
if alpha_channel.ndim == 2: # Expected
pass
elif alpha_channel.ndim == 3 and alpha_channel.shape[2] == 1: # (H, W, 1)
alpha_channel = alpha_channel.squeeze(axis=2)
else:
logger.error(
f"Asset '{context.asset_rule.name}': Extracted alpha channel has unexpected dimensions: "
f"Asset '{asset_name_for_log}': Extracted alpha channel has unexpected dimensions: "
f"{alpha_channel.shape}. Cannot save."
)
return context
@ -122,54 +121,54 @@ class AlphaExtractionToMaskStage(ProcessingStage):
if not save_success:
logger.error(
f"Asset '{context.asset_rule.name}': Failed to save extracted alpha mask to {mask_temp_path}."
f"Asset '{asset_name_for_log}': Failed to save extracted alpha mask to {mask_temp_path}."
)
return context
logger.info(
f"Asset '{context.asset_rule.name}': Extracted alpha and saved as new MASK map: {mask_temp_path}"
f"Asset '{asset_name_for_log}': Extracted alpha and saved as new MASK map: {mask_temp_path}"
)
# E. Create New FileRule for the MASK and Update Context
new_mask_file_rule_id_obj = uuid.uuid4()
new_mask_file_rule_id_str = str(new_mask_file_rule_id_obj) # Use string for FileRule.id
new_mask_file_rule_id_hex = new_mask_file_rule_id_obj.hex # Use hex for dict key
# FileRule does not have id, active, transform_settings, source_map_ids_for_generation
# It has file_path, item_type, item_type_override, etc.
new_mask_file_rule = FileRule(
id=new_mask_file_rule_id_str,
map_type="MASK",
filename_pattern=mask_temp_path.name, # Pattern matches the generated temp file
item_type="MAP_COL", # Considered a collected map post-generation
active=True,
transform_settings=TransformSettings(), # Default transform settings
source_map_ids_for_generation=[source_file_rule_id_for_alpha] # Link to original source
# Ensure other necessary FileRule fields are defaulted or set if required
file_path=mask_temp_path.name, # Use file_path
item_type="MAP_MASK", # This should be the item_type for a mask
map_type="MASK" # Explicitly set map_type if FileRule has it, or handle via item_type
# Other FileRule fields like item_type_override can be set if needed
)
# If FileRule needs a unique identifier, it should be handled differently,
# perhaps by generating one and storing it in common_metadata or a separate mapping.
# For now, we create a simple FileRule.
context.files_to_process.append(new_mask_file_rule)
# For processed_maps_details, we need a unique key. Using a new UUID.
new_mask_processed_map_key = uuid.uuid4().hex
original_dims = source_map_details_for_alpha.get('original_dimensions')
if original_dims is None and full_image_data is not None: # Fallback if not in details
original_dims = (full_image_data.shape[1], full_image_data.shape[0])
context.processed_maps_details[new_mask_file_rule_id_hex] = {
context.processed_maps_details[new_mask_processed_map_key] = {
'map_type': "MASK",
'source_file': str(source_image_path), # Original RGBA map path
'temp_processed_file': str(mask_temp_path), # Path to the new MASK map
'original_dimensions': original_dims, # Dimensions of the source image
'processed_dimensions': (alpha_channel.shape[1], alpha_channel.shape[0]), # Dimensions of MASK
'status': 'Processed', # This map is now considered processed
'source_file': str(source_image_path),
'temp_processed_file': str(mask_temp_path),
'original_dimensions': original_dims,
'processed_dimensions': (alpha_channel.shape[1], alpha_channel.shape[0]),
'status': 'Processed',
'notes': (
f"Generated from alpha of {source_map_details_for_alpha['map_type']} "
f"(Source Rule ID: {source_file_rule_id_for_alpha})"
f"(Source Detail ID: {source_file_rule_id_for_alpha})" # Changed from Source Rule ID
),
'file_rule_id': new_mask_file_rule_id_str # Link back to the new FileRule ID
# 'file_rule_id': new_mask_file_rule_id_str # FileRule doesn't have an ID to link here directly
}
logger.info(
f"Asset '{context.asset_rule.name}': Added new FileRule for generated MASK "
f"(ID: {new_mask_file_rule_id_str}) and updated processed_maps_details."
f"Asset '{asset_name_for_log}': Added new FileRule for generated MASK "
f"and updated processed_maps_details with key '{new_mask_processed_map_key}'."
)
return context

View File

@ -1,6 +1,6 @@
import logging
from ..base_stage import ProcessingStage
from ...asset_context import AssetProcessingContext
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
class AssetSkipLogicStage(ProcessingStage):
"""
@ -17,31 +17,38 @@ class AssetSkipLogicStage(ProcessingStage):
The updated asset processing context.
"""
context.status_flags['skip_asset'] = False # Initialize/reset skip flag
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
# 1. Check for Supplier Error
# Assuming 'supplier_error' might be set by a previous stage (e.g., SupplierDeterminationStage)
# or if effective_supplier is None after attempts to determine it.
if context.effective_supplier is None or context.status_flags.get('supplier_error', False):
logging.info(f"Asset '{context.asset_rule.name}': Skipping due to missing or invalid supplier.")
logging.info(f"Asset '{asset_name_for_log}': Skipping due to missing or invalid supplier.")
context.status_flags['skip_asset'] = True
context.status_flags['skip_reason'] = "Invalid or missing supplier"
return context
# 2. Check asset_rule.process_status
if context.asset_rule.process_status == "SKIP":
logging.info(f"Asset '{context.asset_rule.name}': Skipping as per process_status 'SKIP'.")
# 2. Check process_status in asset_rule.common_metadata
process_status = context.asset_rule.common_metadata.get('process_status')
if process_status == "SKIP":
logging.info(f"Asset '{asset_name_for_log}': Skipping as per common_metadata.process_status 'SKIP'.")
context.status_flags['skip_asset'] = True
context.status_flags['skip_reason'] = "Process status set to SKIP"
context.status_flags['skip_reason'] = "Process status set to SKIP in common_metadata"
return context
if context.asset_rule.process_status == "PROCESSED" and \
not context.config_obj.general_settings.overwrite_existing:
# Assuming context.config_obj.general_settings.overwrite_existing is a valid path.
# This might need adjustment if 'general_settings' or 'overwrite_existing' is not found.
# For now, we'll assume it's correct based on the original code's intent.
if process_status == "PROCESSED" and \
hasattr(context.config_obj, 'general_settings') and \
not getattr(context.config_obj.general_settings, 'overwrite_existing', True): # Default to True (allow overwrite) if not found
logging.info(
f"Asset '{context.asset_rule.name}': Skipping as it's already 'PROCESSED' "
f"Asset '{asset_name_for_log}': Skipping as it's already 'PROCESSED' (from common_metadata) "
f"and overwrite is disabled."
)
context.status_flags['skip_asset'] = True
context.status_flags['skip_reason'] = "Already processed, overwrite disabled"
context.status_flags['skip_reason'] = "Already processed (common_metadata), overwrite disabled"
return context
# If none of the above conditions are met, skip_asset remains False.

View File

@ -2,9 +2,9 @@ import logging
import fnmatch
from typing import List, Set
from ..base_stage import ProcessingStage
from ...asset_context import AssetProcessingContext
from .....rule_structure import FileRule
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
from rule_structure import FileRule
class FileRuleFilterStage(ProcessingStage):
@ -23,46 +23,56 @@ class FileRuleFilterStage(ProcessingStage):
Returns:
The modified AssetProcessingContext.
"""
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
if context.status_flags.get('skip_asset'):
logging.debug(f"Asset '{context.asset_rule.name}': Skipping FileRuleFilterStage due to 'skip_asset' flag.")
logging.debug(f"Asset '{asset_name_for_log}': Skipping FileRuleFilterStage due to 'skip_asset' flag.")
return context
context.files_to_process: List[FileRule] = []
ignore_patterns: Set[str] = set()
# Step 1: Collect all FILE_IGNORE patterns
if context.asset_rule and context.asset_rule.file_rules:
for file_rule in context.asset_rule.file_rules:
if file_rule.item_type == "FILE_IGNORE" and file_rule.active:
ignore_patterns.add(file_rule.filename_pattern)
logging.debug(
f"Asset '{context.asset_rule.name}': Registering ignore pattern: '{file_rule.filename_pattern}'"
)
if context.asset_rule and context.asset_rule.files:
for file_rule in context.asset_rule.files:
if file_rule.item_type == "FILE_IGNORE": # Removed 'and file_rule.active'
if hasattr(file_rule, 'file_path') and file_rule.file_path:
ignore_patterns.add(file_rule.file_path)
logging.debug(
f"Asset '{asset_name_for_log}': Registering ignore pattern: '{file_rule.file_path}'"
)
else:
logging.warning(f"Asset '{asset_name_for_log}': FILE_IGNORE rule found without a file_path. Skipping this ignore rule.")
else:
logging.debug(f"Asset '{context.asset_rule.name if context.asset_rule else 'Unknown'}': No file rules to process or asset_rule is None.")
logging.debug(f"Asset '{asset_name_for_log}': No file rules (context.asset_rule.files) to process or asset_rule is None.")
# Still need to return context even if there are no rules
logging.info(f"Asset '{context.asset_rule.name if context.asset_rule else 'Unknown'}': 0 file rules queued for processing after filtering.")
logging.info(f"Asset '{asset_name_for_log}': 0 file rules queued for processing after filtering.")
return context
# Step 2: Filter and add processable FileRules
for file_rule in context.asset_rule.file_rules:
if not file_rule.active:
logging.debug(
f"Asset '{context.asset_rule.name}': Skipping inactive file rule '{file_rule.filename_pattern}'."
)
continue
for file_rule in context.asset_rule.files: # Iterate over .files
# Removed 'if not file_rule.active:' check
if file_rule.item_type == "FILE_IGNORE":
# Already processed, skip.
continue
is_ignored = False
# Ensure file_rule.file_path exists before using it with fnmatch
current_file_path = file_rule.file_path if hasattr(file_rule, 'file_path') else None
if not current_file_path:
logging.warning(f"Asset '{asset_name_for_log}': FileRule found without a file_path. Skipping this rule for ignore matching.")
# Decide if this rule should be added or skipped if it has no path
# For now, let's assume it might be an error and not add it if it can't be matched.
# If it should be added by default, this logic needs adjustment.
continue
for ignore_pat in ignore_patterns:
if fnmatch.fnmatch(file_rule.filename_pattern, ignore_pat):
if fnmatch.fnmatch(current_file_path, ignore_pat):
is_ignored = True
logging.debug(
f"Asset '{context.asset_rule.name}': Skipping file rule '{file_rule.filename_pattern}' "
f"Asset '{asset_name_for_log}': Skipping file rule for '{current_file_path}' "
f"due to matching ignore pattern '{ignore_pat}'."
)
break
@ -70,11 +80,11 @@ class FileRuleFilterStage(ProcessingStage):
if not is_ignored:
context.files_to_process.append(file_rule)
logging.debug(
f"Asset '{context.asset_rule.name}': Adding file rule '{file_rule.filename_pattern}' "
f"Asset '{asset_name_for_log}': Adding file rule for '{current_file_path}' "
f"(type: {file_rule.item_type}) to processing queue."
)
logging.info(
f"Asset '{context.asset_rule.name}': {len(context.files_to_process)} file rules queued for processing after filtering."
f"Asset '{asset_name_for_log}': {len(context.files_to_process)} file rules queued for processing after filtering."
)
return context

View File

@ -5,9 +5,9 @@ from typing import List
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
from ...rule_structure import FileRule
from ..utils import image_processing_utils as ipu
from ...utils.path_utils import sanitize_filename
from rule_structure import FileRule
from ...utils import image_processing_utils as ipu
from utils.path_utils import sanitize_filename
logger = logging.getLogger(__name__)
@ -30,13 +30,14 @@ class GlossToRoughConversionStage(ProcessingStage):
Returns:
The updated AssetProcessingContext.
"""
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
if context.status_flags.get('skip_asset'):
logger.debug(f"Asset '{context.asset_rule.name}': Skipping GlossToRoughConversionStage due to skip_asset flag.")
logger.debug(f"Asset '{asset_name_for_log}': Skipping GlossToRoughConversionStage due to skip_asset flag.")
return context
if not context.files_to_process or not context.processed_maps_details:
logger.debug(
f"Asset '{context.asset_rule.name}': No files to process or processed_maps_details empty "
f"Asset '{asset_name_for_log}': No files to process or processed_maps_details empty "
f"in GlossToRoughConversionStage. Skipping."
)
return context
@ -44,14 +45,23 @@ class GlossToRoughConversionStage(ProcessingStage):
new_files_to_process: List[FileRule] = []
processed_a_gloss_map = False
logger.info(f"Asset '{context.asset_rule.name}': Starting Gloss to Roughness Conversion Stage.")
logger.info(f"Asset '{asset_name_for_log}': Starting Gloss to Roughness Conversion Stage.")
for idx, file_rule in enumerate(context.files_to_process):
if file_rule.map_type == "GLOSS":
# Assuming FileRule has 'map_type' and 'id' (with a .hex attribute) and 'source_file_path'
# These might need to be checked with hasattr if they are optional or could be missing
if hasattr(file_rule, 'map_type') and file_rule.map_type == "GLOSS":
if not hasattr(file_rule, 'id') or not hasattr(file_rule.id, 'hex'):
logger.warning(f"Asset '{asset_name_for_log}': GLOSS FileRule missing 'id.hex'. Skipping conversion for this rule: {file_rule}")
new_files_to_process.append(file_rule)
continue
map_detail_key = file_rule.id.hex
source_file_path_for_log = file_rule.source_file_path if hasattr(file_rule, 'source_file_path') else "Unknown source path"
if map_detail_key not in context.processed_maps_details:
logger.warning(
f"Asset '{context.asset_rule.name}': GLOSS map '{file_rule.source_file_path}' "
f"Asset '{asset_name_for_log}': GLOSS map '{source_file_path_for_log}' "
f"(ID: {map_detail_key}) found in files_to_process but not in processed_maps_details. "
f"Adding original rule and skipping conversion for this map."
)
@ -62,7 +72,7 @@ class GlossToRoughConversionStage(ProcessingStage):
if map_details.get('status') != 'Processed' or 'temp_processed_file' not in map_details:
logger.warning(
f"Asset '{context.asset_rule.name}': GLOSS map '{file_rule.source_file_path}' "
f"Asset '{asset_name_for_log}': GLOSS map '{source_file_path_for_log}' "
f"(ID: {map_detail_key}) not successfully processed by previous stage or temp file missing. "
f"Status: {map_details.get('status')}. Adding original rule and skipping conversion."
)
@ -74,18 +84,18 @@ class GlossToRoughConversionStage(ProcessingStage):
if not original_temp_path.exists():
logger.error(
f"Asset '{context.asset_rule.name}': Temporary file {original_temp_path_str} for GLOSS map "
f"Asset '{asset_name_for_log}': Temporary file {original_temp_path_str} for GLOSS map "
f"(ID: {map_detail_key}) does not exist. Adding original rule and skipping conversion."
)
new_files_to_process.append(file_rule)
continue
logger.debug(f"Asset '{context.asset_rule.name}': Processing GLOSS map {original_temp_path} for conversion.")
logger.debug(f"Asset '{asset_name_for_log}': Processing GLOSS map {original_temp_path} for conversion.")
image_data = ipu.load_image(original_temp_path)
if image_data is None:
logger.error(
f"Asset '{context.asset_rule.name}': Failed to load image data from {original_temp_path} "
f"Asset '{asset_name_for_log}': Failed to load image data from {original_temp_path} "
f"for GLOSS map (ID: {map_detail_key}). Adding original rule and skipping conversion."
)
new_files_to_process.append(file_rule)
@ -96,14 +106,14 @@ class GlossToRoughConversionStage(ProcessingStage):
if np.issubdtype(image_data.dtype, np.floating):
inverted_image_data = 1.0 - image_data
inverted_image_data = np.clip(inverted_image_data, 0.0, 1.0) # Ensure range for floats
logger.debug(f"Asset '{context.asset_rule.name}': Inverted float image data for {original_temp_path}.")
logger.debug(f"Asset '{asset_name_for_log}': Inverted float image data for {original_temp_path}.")
elif np.issubdtype(image_data.dtype, np.integer):
max_val = np.iinfo(image_data.dtype).max
inverted_image_data = max_val - image_data
logger.debug(f"Asset '{context.asset_rule.name}': Inverted integer image data (max_val: {max_val}) for {original_temp_path}.")
logger.debug(f"Asset '{asset_name_for_log}': Inverted integer image data (max_val: {max_val}) for {original_temp_path}.")
else:
logger.error(
f"Asset '{context.asset_rule.name}': Unsupported image data type {image_data.dtype} "
f"Asset '{asset_name_for_log}': Unsupported image data type {image_data.dtype} "
f"for GLOSS map {original_temp_path}. Cannot invert. Adding original rule."
)
new_files_to_process.append(file_rule)
@ -111,19 +121,22 @@ class GlossToRoughConversionStage(ProcessingStage):
# Save New Temporary (Roughness) Map
# Using original_temp_path.suffix ensures we keep the format (e.g., .png, .exr)
new_temp_filename = f"rough_from_gloss_{sanitize_filename(file_rule.map_type)}_{file_rule.id.hex}{original_temp_path.suffix}"
# Ensure file_rule.map_type exists before using sanitize_filename
map_type_for_filename = file_rule.map_type if hasattr(file_rule, 'map_type') else "unknownmaptype"
new_temp_filename = f"rough_from_gloss_{sanitize_filename(map_type_for_filename)}_{file_rule.id.hex}{original_temp_path.suffix}"
new_temp_path = context.engine_temp_dir / new_temp_filename
save_success = ipu.save_image(new_temp_path, inverted_image_data)
if save_success:
logger.info(
f"Asset '{context.asset_rule.name}': Converted GLOSS map {original_temp_path} "
f"Asset '{asset_name_for_log}': Converted GLOSS map {original_temp_path} "
f"to ROUGHNESS map {new_temp_path}."
)
modified_file_rule = file_rule.model_copy(deep=True)
modified_file_rule.map_type = "ROUGHNESS"
# Assuming FileRule has model_copy method
modified_file_rule = file_rule.model_copy(deep=True) if hasattr(file_rule, 'model_copy') else file_rule
modified_file_rule.map_type = "ROUGHNESS" # Ensure map_type can be set
# Update context.processed_maps_details for the original file_rule.id.hex
context.processed_maps_details[map_detail_key]['temp_processed_file'] = str(new_temp_path)
@ -134,7 +147,7 @@ class GlossToRoughConversionStage(ProcessingStage):
processed_a_gloss_map = True
else:
logger.error(
f"Asset '{context.asset_rule.name}': Failed to save inverted ROUGHNESS map to {new_temp_path} "
f"Asset '{asset_name_for_log}': Failed to save inverted ROUGHNESS map to {new_temp_path} "
f"for original GLOSS map (ID: {map_detail_key}). Adding original rule."
)
new_files_to_process.append(file_rule)
@ -145,11 +158,11 @@ class GlossToRoughConversionStage(ProcessingStage):
if processed_a_gloss_map:
logger.info(
f"Asset '{context.asset_rule.name}': Gloss to Roughness conversion stage successfully processed one or more maps and updated file list."
f"Asset '{asset_name_for_log}': Gloss to Roughness conversion stage successfully processed one or more maps and updated file list."
)
else:
logger.debug(
f"Asset '{context.asset_rule.name}': No gloss maps were successfully converted in GlossToRoughConversionStage. "
f"Asset '{asset_name_for_log}': No gloss maps were successfully converted in GlossToRoughConversionStage. "
f"File list for next stage contains original non-gloss maps and any gloss maps that failed conversion."
)

View File

@ -1,3 +1,5 @@
import uuid
import dataclasses
import os
import logging
from pathlib import Path
@ -6,10 +8,10 @@ from typing import Optional, Tuple, Dict
import cv2
import numpy as np
from ..base_stage import ProcessingStage
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
from ....rule_structure import FileRule, TransformSettings
from ....utils.path_utils import sanitize_filename
from rule_structure import FileRule
from utils.path_utils import sanitize_filename
from ...utils import image_processing_utils as ipu
logger = logging.getLogger(__name__)
@ -26,220 +28,245 @@ class IndividualMapProcessingStage(ProcessingStage):
"""
Executes the individual map processing logic.
"""
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
if context.status_flags.get('skip_asset', False):
logger.info(f"Asset {context.asset_id}: Skipping individual map processing due to skip_asset flag.")
logger.info(f"Asset '{asset_name_for_log}': Skipping individual map processing due to skip_asset flag.")
return context
if not hasattr(context, 'processed_maps_details') or context.processed_maps_details is None:
context.processed_maps_details = {}
logger.debug(f"Asset {context.asset_id}: Initialized processed_maps_details.")
logger.debug(f"Asset '{asset_name_for_log}': Initialized processed_maps_details.")
if not context.files_to_process:
logger.info(f"Asset {context.asset_id}: No files to process in this stage.")
logger.info(f"Asset '{asset_name_for_log}': No files to process in this stage.")
return context
source_base_path = Path(context.asset_rule.source_path)
if not source_base_path.is_dir():
logger.error(f"Asset {context.asset_id}: Source path '{source_base_path}' is not a valid directory. Skipping individual map processing.")
# Source path for the asset group comes from SourceRule
if not context.source_rule or not context.source_rule.input_path:
logger.error(f"Asset '{asset_name_for_log}': SourceRule or SourceRule.input_path is not set. Cannot determine source base path.")
context.status_flags['individual_map_processing_failed'] = True
# Potentially mark all file_rules as failed if source path is invalid
for file_rule in context.files_to_process:
if file_rule.item_type.startswith("MAP_"): # General check for map types
self._update_file_rule_status(context, file_rule.id.hex, 'Failed', details="Source path invalid")
# Mark all file_rules as failed
for fr_idx, file_rule_to_fail in enumerate(context.files_to_process):
temp_id_for_fail = f"fr_fail_{fr_idx}" # Temporary ID for status update
map_type_for_fail = file_rule_to_fail.item_type_override or file_rule_to_fail.item_type or "UnknownMapType"
self._update_file_rule_status(context, temp_id_for_fail, 'Failed', map_type=map_type_for_fail, details="SourceRule.input_path missing")
return context
# The workspace_path in the context should be the directory where files are extracted/available.
source_base_path = context.workspace_path
if not source_base_path.is_dir():
logger.error(f"Asset '{asset_name_for_log}': Workspace path '{source_base_path}' is not a valid directory.")
context.status_flags['individual_map_processing_failed'] = True
for fr_idx, file_rule_to_fail in enumerate(context.files_to_process):
temp_id_for_fail = f"fr_fail_{fr_idx}" # Use a temporary unique ID for this status update
map_type_for_fail = file_rule_to_fail.item_type_override or file_rule_to_fail.item_type or "UnknownMapType"
self._update_file_rule_status(context, temp_id_for_fail, 'Failed', map_type=map_type_for_fail, details="Workspace path invalid")
return context
for file_rule in context.files_to_process:
# Primarily focus on "MAP_COL", "MAP_NORM", "MAP_ROUGH", etc.
# For now, let's assume any item_type starting with "MAP_" is a candidate
# unless it's specifically handled by another stage (e.g., "MAP_GEN" might be).
# The prompt mentions "MAP_COL" primarily.
# Let's be a bit more specific for now, focusing on types that are typically direct file mappings.
# This can be refined based on how `item_type` is used for generated maps.
# For now, we'll process any `FileRule` that isn't explicitly a generated map type
# that would be handled *after* individual processing (e.g. a composite map).
# A simple check for now:
if not file_rule.item_type or not file_rule.item_type.startswith("MAP_") or file_rule.item_type == "MAP_GEN_COMPOSITE": # Example exclusion
logger.debug(f"Asset {context.asset_id}, FileRule {file_rule.id.hex} ({file_rule.map_type}): Skipping, item_type '{file_rule.item_type}' not targeted for individual processing.")
for file_rule_idx, file_rule in enumerate(context.files_to_process):
# Generate a unique ID for this file_rule processing instance for processed_maps_details
current_map_id_hex = f"map_{file_rule_idx}_{uuid.uuid4().hex[:8]}"
current_map_type = file_rule.item_type_override or file_rule.item_type or "UnknownMapType"
if not current_map_type or not current_map_type.startswith("MAP_") or current_map_type == "MAP_GEN_COMPOSITE":
logger.debug(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}': Skipping, item_type '{current_map_type}' not targeted for individual processing.")
continue
logger.info(f"Asset {context.asset_id}, FileRule {file_rule.id.hex} ({file_rule.map_type}): Starting individual processing.")
logger.info(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (Type: {current_map_type}, ID: {current_map_id_hex}): Starting individual processing.")
# A. Find Source File
source_file_path = self._find_source_file(source_base_path, file_rule.filename_pattern, context.asset_id, file_rule.id.hex)
# A. Find Source File (using file_rule.file_path as the pattern relative to source_base_path)
# The _find_source_file might need adjustment if file_rule.file_path is absolute or needs complex globbing.
# For now, assume file_rule.file_path is a relative pattern or exact name.
source_file_path = self._find_source_file(source_base_path, file_rule.file_path, asset_name_for_log, current_map_id_hex)
if not source_file_path:
logger.error(f"Asset {context.asset_id}, FileRule {file_rule.id.hex} ({file_rule.map_type}): Source file not found with pattern '{file_rule.filename_pattern}' in '{source_base_path}'.")
self._update_file_rule_status(context, file_rule.id.hex, 'Failed', map_type=file_rule.map_type, details="Source file not found")
logger.error(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Source file not found with path/pattern '{file_rule.file_path}' in '{source_base_path}'.")
self._update_file_rule_status(context, current_map_id_hex, 'Failed', map_type=current_map_type, details="Source file not found")
continue
# B. Load and Transform Image
image_data: Optional[np.ndarray] = ipu.load_image(str(source_file_path))
if image_data is None:
logger.error(f"Asset {context.asset_id}, FileRule {file_rule.id.hex} ({file_rule.map_type}): Failed to load image from '{source_file_path}'.")
self._update_file_rule_status(context, file_rule.id.hex, 'Failed', map_type=file_rule.map_type, source_file=str(source_file_path), details="Image load failed")
logger.error(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Failed to load image from '{source_file_path}'.")
self._update_file_rule_status(context, current_map_id_hex, 'Failed', map_type=current_map_type, source_file=str(source_file_path), details="Image load failed")
continue
original_height, original_width = image_data.shape[:2]
logger.debug(f"Asset {context.asset_id}, FileRule {file_rule.id.hex}: Loaded image '{source_file_path}' with dimensions {original_width}x{original_height}.")
logger.debug(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Loaded image '{source_file_path}' with dimensions {original_width}x{original_height}.")
# Initialize transform settings with defaults
transform_settings = {
"target_width": 2048,
"target_height": None,
"resize_mode": "fit",
"ensure_pot": False,
"allow_upscale": False,
"resize_filter": "AREA",
"color_profile_management": False,
"target_color_profile": "sRGB",
"output_format_settings": None
}
# Attempt to load transform settings from file_rule.channel_merge_instructions
if file_rule.channel_merge_instructions and 'transform' in file_rule.channel_merge_instructions:
custom_transform_settings = file_rule.channel_merge_instructions['transform']
if isinstance(custom_transform_settings, dict):
transform_settings.update(custom_transform_settings)
logger.info(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Loaded transform settings from file_rule.channel_merge_instructions.")
else:
logger.warning(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): 'transform' in channel_merge_instructions is not a dictionary. Using defaults.")
# TODO: Implement fallback to context.config_obj for global/item_type specific transform settings
# else:
# # Example: config_transforms = context.config_obj.get_transform_settings(file_rule.item_type or file_rule.item_type_override)
# # if config_transforms:
# # transform_settings.update(config_transforms)
transform: TransformSettings = file_rule.transform_settings
target_width, target_height = ipu.calculate_target_dimensions(
original_width, original_height,
transform.target_width, transform.target_height,
transform.resize_mode,
transform.ensure_pot,
transform.allow_upscale
transform_settings['target_width'], transform_settings['target_height'],
transform_settings['resize_mode'],
transform_settings['ensure_pot'],
transform_settings['allow_upscale']
)
logger.debug(f"Asset {context.asset_id}, FileRule {file_rule.id.hex}: Original dims: ({original_width},{original_height}), Calculated target dims: ({target_width},{target_height})")
logger.debug(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Original dims: ({original_width},{original_height}), Calculated target dims: ({target_width},{target_height}) using sourced transforms.")
processed_image_data = image_data.copy() # Start with a copy
processed_image_data = image_data.copy()
if (target_width, target_height) != (original_width, original_height):
logger.info(f"Asset {context.asset_id}, FileRule {file_rule.id.hex}: Resizing from ({original_width},{original_height}) to ({target_width},{target_height}).")
# Map resize_filter string to cv2 interpolation constant
interpolation_map = {
"NEAREST": cv2.INTER_NEAREST,
"LINEAR": cv2.INTER_LINEAR,
"CUBIC": cv2.INTER_CUBIC,
"AREA": cv2.INTER_AREA, # Good for downscaling
"LANCZOS4": cv2.INTER_LANCZOS4
}
interpolation = interpolation_map.get(transform.resize_filter.upper(), cv2.INTER_AREA) # Default to INTER_AREA
logger.info(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Resizing from ({original_width},{original_height}) to ({target_width},{target_height}).")
interpolation_map = {"NEAREST": cv2.INTER_NEAREST, "LINEAR": cv2.INTER_LINEAR, "CUBIC": cv2.INTER_CUBIC, "AREA": cv2.INTER_AREA, "LANCZOS4": cv2.INTER_LANCZOS4}
interpolation = interpolation_map.get(transform_settings['resize_filter'].upper(), cv2.INTER_AREA)
processed_image_data = ipu.resize_image(processed_image_data, target_width, target_height, interpolation=interpolation)
if processed_image_data is None: # Should not happen if resize_image handles errors, but good practice
logger.error(f"Asset {context.asset_id}, FileRule {file_rule.id.hex} ({file_rule.map_type}): Failed to resize image.")
self._update_file_rule_status(context, file_rule.id.hex, 'Failed', map_type=file_rule.map_type, source_file=str(source_file_path), original_dimensions=(original_width, original_height), details="Image resize failed")
if processed_image_data is None:
logger.error(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Failed to resize image.")
self._update_file_rule_status(context, current_map_id_hex, 'Failed', map_type=current_map_type, source_file=str(source_file_path), original_dimensions=(original_width, original_height), details="Image resize failed")
continue
# Color Space Conversion (simplified)
# Assuming ipu.load_image loads as BGR if color.
# This needs more robust handling of source color profiles if they are known.
if transform.color_profile_management and transform.target_color_profile == "RGB":
if len(processed_image_data.shape) == 3 and processed_image_data.shape[2] == 3: # Check if it's a color image
logger.info(f"Asset {context.asset_id}, FileRule {file_rule.id.hex}: Converting BGR to RGB.")
if transform_settings['color_profile_management'] and transform_settings['target_color_profile'] == "RGB":
if len(processed_image_data.shape) == 3 and processed_image_data.shape[2] == 3:
logger.info(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Converting BGR to RGB.")
processed_image_data = ipu.convert_bgr_to_rgb(processed_image_data)
elif len(processed_image_data.shape) == 3 and processed_image_data.shape[2] == 4: # Check for BGRA
logger.info(f"Asset {context.asset_id}, FileRule {file_rule.id.hex}: Converting BGRA to RGBA.")
elif len(processed_image_data.shape) == 3 and processed_image_data.shape[2] == 4:
logger.info(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Converting BGRA to RGBA.")
processed_image_data = ipu.convert_bgra_to_rgba(processed_image_data)
# C. Save Temporary Processed Map
# Ensure engine_temp_dir exists (orchestrator should handle this, but good to be safe)
if not context.engine_temp_dir.exists():
try:
context.engine_temp_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Asset {context.asset_id}: Created engine_temp_dir at '{context.engine_temp_dir}'")
logger.info(f"Asset '{asset_name_for_log}': Created engine_temp_dir at '{context.engine_temp_dir}'")
except OSError as e:
logger.error(f"Asset {context.asset_id}: Failed to create engine_temp_dir '{context.engine_temp_dir}': {e}")
self._update_file_rule_status(context, file_rule.id.hex, 'Failed', map_type=file_rule.map_type, source_file=str(source_file_path), details="Failed to create temp directory")
continue # Or potentially fail the whole asset processing here
logger.error(f"Asset '{asset_name_for_log}': Failed to create engine_temp_dir '{context.engine_temp_dir}': {e}")
self._update_file_rule_status(context, current_map_id_hex, 'Failed', map_type=current_map_type, source_file=str(source_file_path), details="Failed to create temp directory")
continue
temp_filename_suffix = Path(source_file_path).suffix
# Use a more descriptive name if possible, including map_type
safe_map_type = sanitize_filename(file_rule.map_type if file_rule.map_type else "unknown_map")
temp_output_filename = f"processed_{safe_map_type}_{file_rule.id.hex}{temp_filename_suffix}"
safe_map_type_filename = sanitize_filename(current_map_type)
temp_output_filename = f"processed_{safe_map_type_filename}_{current_map_id_hex}{temp_filename_suffix}"
temp_output_path = context.engine_temp_dir / temp_output_filename
# Consider output_format_settings from transform if they apply here
# For now, save_image handles basic saving.
# Example: cv2.imwrite params for quality for JPG, compression for PNG
save_params = []
if transform.output_format_settings:
if transform_settings['output_format_settings']:
if temp_filename_suffix.lower() in ['.jpg', '.jpeg']:
quality = transform.output_format_settings.get('quality', 95)
quality = transform_settings['output_format_settings'].get('quality', 95)
save_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
elif temp_filename_suffix.lower() == '.png':
compression = transform.output_format_settings.get('compression_level', 3) # 0-9, 3 is default
compression = transform_settings['output_format_settings'].get('compression_level', 3)
save_params = [cv2.IMWRITE_PNG_COMPRESSION, compression]
# Add more formats as needed (e.g., EXR, TIFF)
save_success = ipu.save_image(str(temp_output_path), processed_image_data, params=save_params)
if not save_success:
logger.error(f"Asset {context.asset_id}, FileRule {file_rule.id.hex} ({file_rule.map_type}): Failed to save temporary image to '{temp_output_path}'.")
self._update_file_rule_status(
context, file_rule.id.hex, 'Failed',
map_type=file_rule.map_type,
source_file=str(source_file_path),
original_dimensions=(original_width, original_height),
processed_dimensions=(processed_image_data.shape[1], processed_image_data.shape[0]) if processed_image_data is not None else None,
details="Temporary image save failed"
)
logger.error(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Failed to save temporary image to '{temp_output_path}'.")
self._update_file_rule_status(context, current_map_id_hex, 'Failed', map_type=current_map_type, source_file=str(source_file_path), original_dimensions=(original_width, original_height), processed_dimensions=(processed_image_data.shape[1], processed_image_data.shape[0]) if processed_image_data is not None else None, details="Temporary image save failed")
continue
logger.info(f"Asset {context.asset_id}, FileRule {file_rule.id.hex} ({file_rule.map_type}): Successfully processed and saved temporary map to '{temp_output_path}'.")
logger.info(f"Asset '{asset_name_for_log}', FileRule path '{file_rule.file_path}' (ID: {current_map_id_hex}): Successfully processed and saved temporary map to '{temp_output_path}'.")
# D. Update Context
self._update_file_rule_status(
context, file_rule.id.hex, 'Processed',
map_type=file_rule.map_type,
source_file=str(source_file_path),
temp_processed_file=str(temp_output_path),
original_dimensions=(original_width, original_height),
processed_dimensions=(processed_image_data.shape[1], processed_image_data.shape[0]),
details="Successfully processed"
)
self._update_file_rule_status(context, current_map_id_hex, 'Processed', map_type=current_map_type, source_file=str(source_file_path), temp_processed_file=str(temp_output_path), original_dimensions=(original_width, original_height), processed_dimensions=(processed_image_data.shape[1], processed_image_data.shape[0]), details="Successfully processed")
# Optional: Update context.asset_metadata['processed_files']
if 'processed_files' not in context.asset_metadata:
context.asset_metadata['processed_files'] = []
context.asset_metadata['processed_files'].append({
'file_rule_id': file_rule.id.hex,
'processed_map_key': current_map_id_hex, # Changed from file_rule_id
'path': str(temp_output_path),
'type': 'temporary_map',
'map_type': file_rule.map_type
'map_type': current_map_type
})
logger.info(f"Asset {context.asset_id}: Finished individual map processing stage.")
logger.info(f"Asset '{asset_name_for_log}': Finished individual map processing stage.")
return context
def _find_source_file(self, base_path: Path, pattern: str, asset_id: str, file_rule_id_hex: str) -> Optional[Path]:
def _find_source_file(self, base_path: Path, pattern: str, asset_name_for_log: str, current_map_id_hex: str) -> Optional[Path]: # asset_id -> asset_name_for_log, file_rule_id_hex -> current_map_id_hex
"""
Finds a single source file matching the pattern within the base_path.
Adapts logic from ProcessingEngine._find_source_file.
"""
if not pattern:
logger.warning(f"Asset {asset_id}, FileRule {file_rule_id_hex}: Empty filename pattern provided.")
if not pattern: # pattern is now file_rule.file_path
logger.warning(f"Asset '{asset_name_for_log}', Map ID {current_map_id_hex}: Empty file_path provided in FileRule.")
return None
# If pattern is an absolute path, use it directly
potential_abs_path = Path(pattern)
if potential_abs_path.is_absolute() and potential_abs_path.exists():
logger.debug(f"Asset '{asset_name_for_log}', Map ID {current_map_id_hex}: file_path '{pattern}' is absolute and exists. Using it directly.")
return potential_abs_path
elif potential_abs_path.is_absolute():
logger.warning(f"Asset '{asset_name_for_log}', Map ID {current_map_id_hex}: file_path '{pattern}' is absolute but does not exist.")
# Fall through to try resolving against base_path if it's just a name/relative pattern
# Treat pattern as relative to base_path
# This could be an exact name or a glob pattern
try:
# Using rglob for potentially nested structures, though original might have been simpler.
# If pattern is exact filename, it will also work.
# If pattern is a glob, it will search.
matched_files = list(base_path.rglob(pattern))
if not matched_files:
logger.debug(f"Asset {asset_id}, FileRule {file_rule_id_hex}: No files found matching pattern '{pattern}' in '{base_path}' (recursive).")
# Try non-recursive if rglob fails and pattern might be for top-level
matched_files_non_recursive = list(base_path.glob(pattern))
if matched_files_non_recursive:
logger.debug(f"Asset {asset_id}, FileRule {file_rule_id_hex}: Found {len(matched_files_non_recursive)} files non-recursively. Using first: {matched_files_non_recursive[0]}")
return matched_files_non_recursive[0]
return None
if len(matched_files) > 1:
logger.warning(f"Asset {asset_id}, FileRule {file_rule_id_hex}: Multiple files ({len(matched_files)}) found for pattern '{pattern}' in '{base_path}'. Using the first one: {matched_files[0]}. Files: {matched_files}")
return matched_files[0]
# First, check if pattern is an exact relative path
exact_match_path = base_path / pattern
if exact_match_path.exists() and exact_match_path.is_file():
logger.debug(f"Asset '{asset_name_for_log}', Map ID {current_map_id_hex}: Found exact match for '{pattern}' at '{exact_match_path}'.")
return exact_match_path
# If not an exact match, try as a glob pattern (recursive)
matched_files_rglob = list(base_path.rglob(pattern))
if matched_files_rglob:
if len(matched_files_rglob) > 1:
logger.warning(f"Asset '{asset_name_for_log}', Map ID {current_map_id_hex}: Multiple files ({len(matched_files_rglob)}) found for pattern '{pattern}' in '{base_path}' (recursive). Using first: {matched_files_rglob[0]}. Files: {matched_files_rglob}")
return matched_files_rglob[0]
# Try non-recursive glob if rglob fails
matched_files_glob = list(base_path.glob(pattern))
if matched_files_glob:
if len(matched_files_glob) > 1:
logger.warning(f"Asset '{asset_name_for_log}', Map ID {current_map_id_hex}: Multiple files ({len(matched_files_glob)}) found for pattern '{pattern}' in '{base_path}' (non-recursive). Using first: {matched_files_glob[0]}. Files: {matched_files_glob}")
return matched_files_glob[0]
logger.debug(f"Asset '{asset_name_for_log}', Map ID {current_map_id_hex}: No files found matching pattern '{pattern}' in '{base_path}' (exact, recursive, or non-recursive).")
return None
except Exception as e:
logger.error(f"Asset {asset_id}, FileRule {file_rule_id_hex}: Error searching for file with pattern '{pattern}' in '{base_path}': {e}")
logger.error(f"Asset '{asset_name_for_log}', Map ID {current_map_id_hex}: Error searching for file with pattern '{pattern}' in '{base_path}': {e}")
return None
def _update_file_rule_status(self, context: AssetProcessingContext, file_rule_id_hex: str, status: str, **kwargs):
"""Helper to update processed_maps_details for a file_rule."""
if file_rule_id_hex not in context.processed_maps_details:
context.processed_maps_details[file_rule_id_hex] = {}
def _update_file_rule_status(self, context: AssetProcessingContext, map_id_hex: str, status: str, **kwargs): # file_rule_id_hex -> map_id_hex
"""Helper to update processed_maps_details for a map."""
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
if map_id_hex not in context.processed_maps_details:
context.processed_maps_details[map_id_hex] = {}
context.processed_maps_details[file_rule_id_hex]['status'] = status
context.processed_maps_details[map_id_hex]['status'] = status
for key, value in kwargs.items():
context.processed_maps_details[file_rule_id_hex][key] = value
context.processed_maps_details[map_id_hex][key] = value
# Ensure essential keys are present even on failure, if known
if 'map_type' not in context.processed_maps_details[file_rule_id_hex] and 'map_type' in kwargs:
context.processed_maps_details[file_rule_id_hex]['map_type'] = kwargs['map_type']
if 'map_type' not in context.processed_maps_details[map_id_hex] and 'map_type' in kwargs:
context.processed_maps_details[map_id_hex]['map_type'] = kwargs['map_type']
# Add formatted resolution names
if 'original_dimensions' in kwargs and isinstance(kwargs['original_dimensions'], tuple) and len(kwargs['original_dimensions']) == 2:
orig_w, orig_h = kwargs['original_dimensions']
context.processed_maps_details[map_id_hex]['original_resolution_name'] = f"{orig_w}x{orig_h}"
if status == 'Processed' and 'processed_dimensions' in kwargs and isinstance(kwargs['processed_dimensions'], tuple) and len(kwargs['processed_dimensions']) == 2:
proc_w, proc_h = kwargs['processed_dimensions']
context.processed_maps_details[map_id_hex]['processed_resolution_name'] = f"{proc_w}x{proc_h}"
elif 'processed_dimensions' in kwargs: # If present but not as expected, log or handle
logger.warning(f"Asset '{asset_name_for_log}', Map ID {map_id_hex}: 'processed_dimensions' present but not a valid tuple: {kwargs['processed_dimensions']}")
logger.debug(f"Asset {context.asset_id}, FileRule {file_rule_id_hex}: Status updated to '{status}'. Details: {kwargs}")
# Log all details being stored for clarity, including the newly added resolution names
log_details = context.processed_maps_details[map_id_hex].copy()
logger.debug(f"Asset '{asset_name_for_log}', Map ID {map_id_hex}: Status updated to '{status}'. Details: {log_details}")

View File

@ -5,10 +5,10 @@ from typing import Dict, Optional, List, Tuple
import numpy as np
import cv2 # For potential direct cv2 operations if ipu doesn't cover all merge needs
from ..base_stage import ProcessingStage
from ...asset_context import AssetProcessingContext
from ....rule_structure import FileRule, MergeSettings, MergeInputChannel
from ....utils.path_utils import sanitize_filename
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
from rule_structure import FileRule
from utils.path_utils import sanitize_filename
from ...utils import image_processing_utils as ipu
@ -30,281 +30,244 @@ class MapMergingStage(ProcessingStage):
Returns:
The updated asset processing context.
"""
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
if context.status_flags.get('skip_asset'):
logger.info(f"Skipping map merging for asset {context.asset_name} as skip_asset flag is set.")
logger.info(f"Skipping map merging for asset {asset_name_for_log} as skip_asset flag is set.")
return context
if not hasattr(context, 'merged_maps_details'):
context.merged_maps_details = {}
if not hasattr(context, 'processed_maps_details'):
logger.warning(f"Asset {context.asset_name}: 'processed_maps_details' not found in context. Cannot perform map merging.")
logger.warning(f"Asset {asset_name_for_log}: 'processed_maps_details' not found in context. Cannot perform map merging.")
return context
if not context.files_to_process:
logger.info(f"Asset {context.asset_name}: No files_to_process defined. Skipping map merging.")
return context
if not context.files_to_process: # This list might not be relevant if merge rules are defined elsewhere or implicitly
logger.info(f"Asset {asset_name_for_log}: No files_to_process defined. This stage might rely on config or processed_maps_details directly for merge rules.")
# Depending on design, this might not be an error, so we don't return yet.
logger.info(f"Starting MapMergingStage for asset: {context.asset_name}")
logger.info(f"Starting MapMergingStage for asset: {asset_name_for_log}")
for merge_rule in context.files_to_process:
# TODO: The logic for identifying merge rules and their inputs needs significant rework
# as FileRule no longer has 'id' or 'merge_settings' directly in the way this stage expects.
# Merge rules are likely defined in the main configuration (context.config_obj.map_merge_rules)
# and need to be matched against available maps in context.processed_maps_details.
# Placeholder for the loop that would iterate over context.config_obj.map_merge_rules
# For now, this stage will effectively do nothing until that logic is implemented.
# Example of how one might start to adapt:
# for configured_merge_rule in context.config_obj.map_merge_rules:
# output_map_type = configured_merge_rule.get('output_map_type')
# inputs_config = configured_merge_rule.get('inputs') # e.g. {"R": "NORMAL", "G": "ROUGHNESS"}
# # ... then find these input map_types in context.processed_maps_details ...
# # ... and perform the merge ...
# # This is a complex change beyond simple attribute renaming.
# The following is the original loop structure, which will likely fail due to missing attributes on FileRule.
# Keeping it commented out to show what was there.
"""
for merge_rule in context.files_to_process: # This iteration logic is likely incorrect for merge rules
if not isinstance(merge_rule, FileRule) or merge_rule.item_type != "MAP_MERGE":
continue
if not merge_rule.merge_settings:
logger.error(f"Asset {context.asset_name}, Rule ID {merge_rule.id.hex}: Merge rule for map_type '{merge_rule.map_type}' is missing merge_settings. Skipping this merge.")
context.merged_maps_details[merge_rule.id.hex] = {
'map_type': merge_rule.map_type,
'status': 'Failed',
'reason': 'Missing merge_settings in FileRule.'
}
# FileRule does not have merge_settings or id.hex
# This entire block needs to be re-thought based on where merge rules are defined.
# Assuming merge_rule_id_hex would be a generated UUID for this operation.
merge_rule_id_hex = f"merge_op_{uuid.uuid4().hex[:8]}"
current_map_type = merge_rule.item_type_override or merge_rule.item_type
logger.error(f"Asset {asset_name_for_log}, Potential Merge for {current_map_type}: Merge rule processing needs rework. FileRule lacks 'merge_settings' and 'id'. Skipping this rule.")
context.merged_maps_details[merge_rule_id_hex] = {
'map_type': current_map_type,
'status': 'Failed',
'reason': 'Merge rule processing logic in MapMergingStage needs refactor due to FileRule changes.'
}
continue
"""
# For now, let's assume no merge rules are processed until the logic is fixed.
num_merge_rules_attempted = 0
# If context.config_obj.map_merge_rules exists, iterate it here.
# The original code iterated context.files_to_process looking for item_type "MAP_MERGE".
# This implies FileRule objects were being used to define merge operations, which is no longer the case
# if 'merge_settings' and 'id' were removed from FileRule.
# The core merge rules are in context.config_obj.map_merge_rules
# Each rule in there defines an output_map_type and its inputs.
config_merge_rules = context.config_obj.map_merge_rules
if not config_merge_rules:
logger.info(f"Asset {asset_name_for_log}: No map_merge_rules found in configuration. Skipping map merging.")
return context
for rule_idx, configured_merge_rule in enumerate(config_merge_rules):
output_map_type = configured_merge_rule.get('output_map_type')
inputs_map_type_to_channel = configured_merge_rule.get('inputs') # e.g. {"R": "NRM", "G": "NRM", "B": "ROUGH"}
default_values = configured_merge_rule.get('defaults', {}) # e.g. {"R": 0.5, "G": 0.5, "B": 0.5}
# output_bit_depth_rule = configured_merge_rule.get('output_bit_depth', 'respect_inputs') # Not used yet
if not output_map_type or not inputs_map_type_to_channel:
logger.warning(f"Asset {asset_name_for_log}: Invalid configured_merge_rule at index {rule_idx}. Missing 'output_map_type' or 'inputs'. Rule: {configured_merge_rule}")
continue
merge_settings: MergeSettings = merge_rule.merge_settings
output_map_type = merge_rule.map_type
rule_id_hex = merge_rule.id.hex
logger.info(f"Processing MAP_MERGE rule for '{output_map_type}' (ID: {rule_id_hex})")
num_merge_rules_attempted +=1
merge_op_id = f"merge_{sanitize_filename(output_map_type)}_{rule_idx}"
logger.info(f"Asset {asset_name_for_log}: Processing configured merge rule for '{output_map_type}' (Op ID: {merge_op_id})")
loaded_input_maps: Dict[str, np.ndarray] = {}
input_map_paths: Dict[str, str] = {}
target_dims: Optional[Tuple[int, int]] = None # width, height
loaded_input_maps: Dict[str, np.ndarray] = {} # Key: input_map_type (e.g. "NRM"), Value: image_data
input_map_paths: Dict[str, str] = {} # Key: input_map_type, Value: path_str
target_dims: Optional[Tuple[int, int]] = None
all_inputs_valid = True
# A. Load Input Maps for Merging
if not merge_settings.input_maps:
logger.warning(f"Asset {context.asset_name}, Rule ID {rule_id_hex}: No input_maps defined in merge_settings for '{output_map_type}'. Skipping this merge.")
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': 'No input_maps defined in merge_settings.'
}
continue
for input_map_config in merge_settings.input_maps:
input_rule_id_hex = input_map_config.file_rule_id.hex
processed_detail = context.processed_maps_details.get(input_rule_id_hex)
if not processed_detail or processed_detail.get('status') != 'Processed':
error_msg = f"Input map (Rule ID: {input_rule_id_hex}) for merge rule '{output_map_type}' (Rule ID: {rule_id_hex}) not found or not processed. Details: {processed_detail}"
logger.error(error_msg)
all_inputs_valid = False
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': f"Input map {input_rule_id_hex} not processed or missing."
}
break
# Find and load input maps from processed_maps_details
# This assumes one processed map per map_type. If multiple variants exist, this needs refinement.
required_input_map_types = set(inputs_map_type_to_channel.values())
for required_map_type in required_input_map_types:
found_processed_map = None
processed_map_key = None
for p_key, p_details in context.processed_maps_details.items():
processed_map_type_in_details = p_details.get('map_type')
# Check for direct match or match with "MAP_" prefix
if (processed_map_type_in_details == required_map_type or \
processed_map_type_in_details == f"MAP_{required_map_type}") and \
p_details.get('status') == 'Processed':
found_processed_map = p_details
processed_map_key = p_key # The UUID hex key from individual processing
break
temp_processed_file_path = Path(processed_detail['temp_processed_file'])
if not temp_processed_file_path.exists():
error_msg = f"Input map file {temp_processed_file_path} for merge rule '{output_map_type}' (Rule ID: {rule_id_hex}) does not exist."
logger.error(error_msg)
if not found_processed_map:
logger.warning(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Required input map_type '{required_map_type}' for output '{output_map_type}' not found or not processed in processed_maps_details.")
# Option: Use default value for the entire map if one could be constructed for this map_type
# For now, we fail the merge if a required map is missing.
all_inputs_valid = False
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': f"Input map file {temp_processed_file_path} not found."
}
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Required input map_type '{required_map_type}' missing."}
break # Break from finding inputs for this merge rule
temp_file_path = Path(found_processed_map['temp_processed_file'])
if not temp_file_path.exists():
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Temp file {temp_file_path} for input map_type '{required_map_type}' does not exist.")
all_inputs_valid = False
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Temp file for input '{required_map_type}' missing."}
break
try:
image_data = ipu.load_image(temp_processed_file_path)
image_data = ipu.load_image(temp_file_path)
if image_data is None: raise ValueError("Loaded image is None")
except Exception as e:
logger.error(f"Error loading image {temp_processed_file_path} for merge rule '{output_map_type}' (Rule ID: {rule_id_hex}): {e}")
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Error loading image {temp_file_path} for input map_type '{required_map_type}': {e}")
all_inputs_valid = False
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': f"Error loading input image {temp_processed_file_path}."
}
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Error loading input '{required_map_type}'."}
break
if image_data is None:
logger.error(f"Failed to load image data from {temp_processed_file_path} for merge rule '{output_map_type}' (Rule ID: {rule_id_hex}).")
all_inputs_valid = False
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': f"Failed to load image data from {temp_processed_file_path}."
}
break
loaded_input_maps[required_map_type] = image_data
input_map_paths[required_map_type] = str(temp_file_path)
loaded_input_maps[input_rule_id_hex] = image_data
input_map_paths[input_rule_id_hex] = str(temp_processed_file_path)
current_dims = (image_data.shape[1], image_data.shape[0]) # width, height
current_dims = (image_data.shape[1], image_data.shape[0])
if target_dims is None:
target_dims = current_dims
logger.debug(f"Merge rule '{output_map_type}' (ID: {rule_id_hex}): Set target dimensions to {target_dims} from first input {temp_processed_file_path}.")
elif current_dims != target_dims:
logger.warning(f"Input map {temp_processed_file_path} for merge rule '{output_map_type}' (ID: {rule_id_hex}) has dimensions {current_dims}, but target is {target_dims}. Resizing.")
logger.warning(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Input map '{required_map_type}' dims {current_dims} differ from target {target_dims}. Resizing.")
try:
image_data = ipu.resize_image(image_data, target_dims[0], target_dims[1])
if image_data is None:
raise ValueError("Resize operation returned None.")
loaded_input_maps[input_rule_id_hex] = image_data
if image_data is None: raise ValueError("Resize returned None")
loaded_input_maps[required_map_type] = image_data
except Exception as e:
logger.error(f"Failed to resize image {temp_processed_file_path} for merge rule '{output_map_type}' (ID: {rule_id_hex}): {e}")
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Failed to resize '{required_map_type}': {e}")
all_inputs_valid = False
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': f"Failed to resize input image {temp_processed_file_path}."
}
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f"Failed to resize input '{required_map_type}'."}
break
if not all_inputs_valid:
# Failure already logged and recorded in context.merged_maps_details
logger.warning(f"Skipping merge for '{output_map_type}' (ID: {rule_id_hex}) due to invalid inputs.")
logger.warning(f"Asset {asset_name_for_log}: Skipping merge for Op ID {merge_op_id} ('{output_map_type}') due to invalid inputs.")
continue
if target_dims is None: # Should not happen if all_inputs_valid is true and there was at least one input map
logger.error(f"Merge rule '{output_map_type}' (ID: {rule_id_hex}): Target dimensions not determined despite valid inputs. This indicates an issue with input map loading or an empty input_maps list that wasn't caught.")
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': 'Target dimensions could not be determined.'
}
if not loaded_input_maps or target_dims is None:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: No input maps loaded or target_dims not set for '{output_map_type}'. This shouldn't happen if all_inputs_valid was true.")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': 'Internal error: input maps not loaded or target_dims missing.'}
continue
# Determine output channels (e.g., 3 for RGB, 1 for Grayscale)
# This depends on the keys in inputs_map_type_to_channel (R,G,B,A)
output_channel_keys = sorted(list(inputs_map_type_to_channel.keys())) # e.g. ['B', 'G', 'R']
num_output_channels = len(output_channel_keys)
if num_output_channels == 0:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: No output channels defined in 'inputs' for '{output_map_type}'.")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': 'No output channels defined.'}
continue
# B. Perform Merge Operation
try:
if merge_settings.output_channels == 1:
if num_output_channels == 1: # Grayscale output
merged_image = np.zeros((target_dims[1], target_dims[0]), dtype=np.uint8)
else:
merged_image = np.zeros((target_dims[1], target_dims[0], merge_settings.output_channels), dtype=np.uint8)
else: # Color output
merged_image = np.zeros((target_dims[1], target_dims[0], num_output_channels), dtype=np.uint8)
except Exception as e:
logger.error(f"Error creating empty merged image for '{output_map_type}' (ID: {rule_id_hex}) with dims {target_dims} and {merge_settings.output_channels} channels: {e}")
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': f'Error creating output image canvas: {e}'
}
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Error creating empty merged image for '{output_map_type}': {e}")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f'Error creating output canvas: {e}'}
continue
merge_op_failed = False
for input_map_config in merge_settings.input_maps:
source_image = loaded_input_maps[input_map_config.file_rule_id.hex]
source_channel_index = input_map_config.source_channel
target_channel_index = input_map_config.target_channel
merge_op_failed_detail = False
for i, out_channel_char in enumerate(output_channel_keys): # e.g. R, G, B
input_map_type_for_this_channel = inputs_map_type_to_channel[out_channel_char]
source_image = loaded_input_maps.get(input_map_type_for_this_channel)
source_data = None
if source_image.ndim == 2: # Grayscale
source_data = source_image
elif source_image.ndim == 3: # Multi-channel (e.g. RGB, RGBA)
if source_channel_index >= source_image.shape[2]:
logger.error(f"Merge rule '{output_map_type}' (ID: {rule_id_hex}): Source channel index {source_channel_index} out of bounds for source image with shape {source_image.shape} (from Rule ID {input_map_config.file_rule_id.hex}).")
merge_op_failed = True
break
source_data = source_image[:, :, source_channel_index]
else:
logger.error(f"Merge rule '{output_map_type}' (ID: {rule_id_hex}): Source image (from Rule ID {input_map_config.file_rule_id.hex}) has unexpected dimensions: {source_image.ndim}. Shape: {source_image.shape}")
merge_op_failed = True
break
source_data_this_channel = None
if source_image is not None:
if source_image.ndim == 2: # Grayscale source
source_data_this_channel = source_image
elif source_image.ndim == 3: # Color source, take the first channel (assuming it's grayscale or R of RGB)
source_data_this_channel = source_image[:,:,0]
logger.debug(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Taking channel 0 from {input_map_type_for_this_channel} for output {out_channel_char}.")
else: # Source map was not found, use default
default_val_for_channel = default_values.get(out_channel_char)
if default_val_for_channel is not None:
# Convert 0-1 float default to 0-255 uint8
source_data_this_channel = np.full((target_dims[1], target_dims[0]), int(default_val_for_channel * 255), dtype=np.uint8)
logger.info(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Using default value {default_val_for_channel} for output channel '{out_channel_char}' as input map '{input_map_type_for_this_channel}' was missing.")
else:
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Input map '{input_map_type_for_this_channel}' for output channel '{out_channel_char}' is missing and no default value provided.")
merge_op_failed_detail = True; break
if source_data is None: # Should be caught by previous checks
logger.error(f"Merge rule '{output_map_type}' (ID: {rule_id_hex}): Failed to extract source_data for unknown reasons from input {input_map_config.file_rule_id.hex}.")
merge_op_failed = True
break
if source_data_this_channel is None: # Should be caught by default value logic or earlier checks
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Failed to get source data for output channel '{out_channel_char}'.")
merge_op_failed_detail = True; break
# Assign to target channel
try:
if merged_image.ndim == 2: # Output is grayscale
if merge_settings.output_channels != 1:
logger.error(f"Merge rule '{output_map_type}' (ID: {rule_id_hex}): Mismatch - merged_image is 2D but output_channels is {merge_settings.output_channels}.")
merge_op_failed = True
break
merged_image = source_data # Overwrites if multiple inputs map to grayscale; consider blending or specific logic if needed
elif merged_image.ndim == 3: # Output is multi-channel
if target_channel_index >= merged_image.shape[2]:
logger.error(f"Merge rule '{output_map_type}' (ID: {rule_id_hex}): Target channel index {target_channel_index} out of bounds for merged image with shape {merged_image.shape}.")
merge_op_failed = True
break
merged_image[:, :, target_channel_index] = source_data
else: # Should not happen
logger.error(f"Merge rule '{output_map_type}' (ID: {rule_id_hex}): Merged image has unexpected dimensions: {merged_image.ndim}. Shape: {merged_image.shape}")
merge_op_failed = True
break
if merged_image.ndim == 2: # Single channel output
merged_image = source_data_this_channel
else: # Multi-channel output
merged_image[:, :, i] = source_data_this_channel
except Exception as e:
logger.error(f"Error assigning source data to target channel for '{output_map_type}' (ID: {rule_id_hex}): {e}. Source shape: {source_data.shape}, Target channel: {target_channel_index}, Merged image shape: {merged_image.shape}")
merge_op_failed = True
break
if input_map_config.invert_source_channel:
if merged_image.ndim == 2:
merged_image = 255 - merged_image # Assumes uint8
elif merged_image.ndim == 3:
# Ensure we are not inverting an alpha channel if that's not desired,
# but current spec inverts the target channel data.
merged_image[:, :, target_channel_index] = 255 - merged_image[:, :, target_channel_index]
# input_map_config.default_value_if_missing:
# This was handled by all_inputs_valid check for file presence.
# If a channel is missing from a multi-channel source, that's an error in source_channel_index.
# If a file is entirely missing and a default color/value is needed for the *output channel*,
# that would be a different logic, perhaps pre-filling merged_image.
# For now, we assume if an input map is specified, it must be present and valid.
if merge_op_failed:
logger.error(f"Merge operation failed for '{output_map_type}' (ID: {rule_id_hex}).")
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': 'Error during channel packing/merge operation.'
}
continue
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Error assigning data to output channel '{out_channel_char}' (index {i}): {e}")
merge_op_failed_detail = True; break
# C. Save Temporary Merged Map
# Default to PNG, or use format from merge_settings if available (future enhancement)
output_format = getattr(merge_settings, 'output_format', 'png').lower()
if output_format not in ['png', 'jpg', 'jpeg', 'tif', 'tiff', 'exr']: # Add more as ipu supports
logger.warning(f"Unsupported output_format '{output_format}' in merge_settings for '{output_map_type}' (ID: {rule_id_hex}). Defaulting to PNG.")
output_format = 'png'
temp_merged_filename = f"merged_{sanitize_filename(output_map_type)}_{rule_id_hex}.{output_format}"
if not context.engine_temp_dir:
logger.error(f"Asset {context.asset_name}: engine_temp_dir is not set. Cannot save merged map.")
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': 'engine_temp_dir not set in context.'
}
if merge_op_failed_detail:
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': 'Error during channel assignment.'}
continue
output_format = 'png' # Default, can be configured per rule later
temp_merged_filename = f"merged_{sanitize_filename(output_map_type)}_{merge_op_id}.{output_format}"
temp_merged_path = context.engine_temp_dir / temp_merged_filename
try:
save_success = ipu.save_image(temp_merged_path, merged_image)
if not save_success: raise ValueError("Save image returned false")
except Exception as e:
logger.error(f"Error saving merged image {temp_merged_path} for '{output_map_type}' (ID: {rule_id_hex}): {e}")
save_success = False
if not save_success:
logger.error(f"Failed to save temporary merged map to {temp_merged_path} for '{output_map_type}' (ID: {rule_id_hex}).")
context.merged_maps_details[rule_id_hex] = {
'map_type': output_map_type,
'status': 'Failed',
'reason': f'Failed to save merged image to {temp_merged_path}.'
}
logger.error(f"Asset {asset_name_for_log}, Merge Op ID {merge_op_id}: Error saving merged image {temp_merged_path}: {e}")
context.merged_maps_details[merge_op_id] = {'map_type': output_map_type, 'status': 'Failed', 'reason': f'Failed to save merged image: {e}'}
continue
logger.info(f"Successfully merged and saved '{output_map_type}' (ID: {rule_id_hex}) to {temp_merged_path}")
# D. Update Context
context.merged_maps_details[rule_id_hex] = {
logger.info(f"Asset {asset_name_for_log}: Successfully merged and saved '{output_map_type}' (Op ID: {merge_op_id}) to {temp_merged_path}")
context.merged_maps_details[merge_op_id] = {
'map_type': output_map_type,
'temp_merged_file': str(temp_merged_path),
'input_map_ids_used': [mc.file_rule_id.hex for mc in merge_settings.input_maps],
'input_map_files_used': input_map_paths, # Dict[rule_id_hex, path_str]
'merged_dimensions': target_dims, # (width, height)
'status': 'Processed',
'file_rule_id': rule_id_hex # For easier reverse lookup if needed
'input_map_types_used': list(inputs_map_type_to_channel.values()),
'input_map_files_used': input_map_paths,
'merged_dimensions': target_dims,
'status': 'Processed'
}
# Optional: Update context.asset_metadata['processed_files'] or similar
# This might be better handled by a later stage that finalizes files.
# For now, merged_maps_details is the primary record.
logger.info(f"Finished MapMergingStage for asset: {context.asset_name}. Merged maps: {len(context.merged_maps_details)}")
logger.info(f"Finished MapMergingStage for asset: {asset_name_for_log}. Merged maps operations attempted: {num_merge_rules_attempted}, Succeeded: {len([d for d in context.merged_maps_details.values() if d.get('status') == 'Processed'])}")
return context

View File

@ -6,7 +6,7 @@ from typing import Any, Dict
from ..asset_context import AssetProcessingContext
from .base_stage import ProcessingStage
from ....utils.path_utils import generate_path_from_pattern
from utils.path_utils import generate_path_from_pattern, sanitize_filename
logger = logging.getLogger(__name__)
@ -21,29 +21,34 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
"""
Finalizes metadata, determines output path, and saves the metadata JSON file.
"""
asset_name_for_log = "Unknown Asset"
if hasattr(context, 'asset_rule') and context.asset_rule and hasattr(context.asset_rule, 'asset_name'):
asset_name_for_log = context.asset_rule.asset_name
if not hasattr(context, 'asset_metadata') or not context.asset_metadata:
if context.status_flags.get('skip_asset'):
logger.info(
f"Asset '{context.asset_rule.name if hasattr(context, 'asset_rule') and context.asset_rule else 'Unknown'}': "
f"Asset '{asset_name_for_log}': "
f"Skipped before metadata initialization. No metadata file will be saved."
)
else:
logger.warning(
f"Asset '{context.asset_rule.name if hasattr(context, 'asset_rule') and context.asset_rule else 'Unknown'}': "
f"Asset '{asset_name_for_log}': "
f"asset_metadata not initialized. Skipping metadata finalization and save."
)
return context
# Check Skip Flag
if context.status_flags.get('skip_asset'):
context.asset_metadata['status'] = "Skipped"
context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
context.asset_metadata['notes'] = context.status_flags.get('skip_reason', 'Skipped early in pipeline')
logger.info(
f"Asset '{context.asset_rule.name}': Marked as skipped. Reason: {context.asset_metadata['notes']}"
f"Asset '{asset_name_for_log}': Marked as skipped. Reason: {context.asset_metadata['notes']}"
)
# Assuming we save metadata for skipped assets if it was initialized.
# If not, the logic to skip saving would be here or before path generation.
# However, if we are here, asset_metadata IS initialized.
# A. Finalize Metadata
context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
@ -52,7 +57,8 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
if context.asset_metadata.get('status') != "Skipped":
has_errors = any(
context.status_flags.get(error_flag)
for error_flag in ['file_processing_error', 'merge_error', 'critical_error'] # Added critical_error
for error_flag in ['file_processing_error', 'merge_error', 'critical_error',
'individual_map_processing_failed', 'metadata_save_error'] # Added more flags
)
if has_errors:
context.asset_metadata['status'] = "Failed"
@ -64,31 +70,54 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
context.asset_metadata['merged_map_details'] = getattr(context, 'merged_maps_details', {})
# (Optional) Add a list of all temporary files
context.asset_metadata['temporary_files'] = getattr(context, 'temporary_files', [])
# context.asset_metadata['temporary_files'] = getattr(context, 'temporary_files', []) # Assuming this is populated elsewhere
# B. Determine Metadata Output Path
# Ensure asset_rule and source_rule exist before accessing their names
asset_name = context.asset_rule.name if hasattr(context, 'asset_rule') and context.asset_rule else "unknown_asset"
source_rule_name = context.source_rule.name if hasattr(context, 'source_rule') and context.source_rule else "unknown_source"
# asset_name_for_log is defined at the top of the function if asset_metadata exists
source_rule_identifier_for_path = "unknown_source"
if hasattr(context, 'source_rule') and context.source_rule:
if hasattr(context.source_rule, 'supplier_identifier') and context.source_rule.supplier_identifier:
source_rule_identifier_for_path = context.source_rule.supplier_identifier
elif hasattr(context.source_rule, 'input_path') and context.source_rule.input_path:
source_rule_identifier_for_path = Path(context.source_rule.input_path).stem # Use stem of input path if no identifier
else:
source_rule_identifier_for_path = "unknown_source_details"
metadata_filename = f"{asset_name}_metadata.json"
output_path_pattern = context.asset_rule.output_path_pattern if hasattr(context, 'asset_rule') and context.asset_rule else ""
# Use the configured metadata filename from config_obj
metadata_filename_from_config = getattr(context.config_obj, 'metadata_filename', "metadata.json")
# Ensure asset_name_for_log is safe for filenames
safe_asset_name = sanitize_filename(asset_name_for_log) # asset_name_for_log is defined at the top
final_metadata_filename = f"{safe_asset_name}_{metadata_filename_from_config}"
# Handle potential missing sha5_value, defaulting to None or an empty string
sha_value = getattr(context, 'sha5_value', getattr(context, 'sha_value', None))
# Output path pattern should come from config_obj, not asset_rule
output_path_pattern_from_config = getattr(context.config_obj, 'output_directory_pattern', "[supplier]/[assetname]")
sha_value = getattr(context, 'sha5_value', None) # Prefer sha5_value if explicitly set on context
if sha_value is None: # Fallback to sha256_value if that was the intended attribute
sha_value = getattr(context, 'sha256_value', None)
token_data = {
"assetname": asset_name_for_log,
"supplier": context.effective_supplier if context.effective_supplier else source_rule_identifier_for_path,
"sourcerulename": source_rule_identifier_for_path,
"incrementingvalue": getattr(context, 'incrementing_value', None),
"sha5": sha_value, # Assuming pattern uses [sha5] or similar for sha_value
"maptype": "metadata", # Added maptype to token_data
"filename": final_metadata_filename # Added filename to token_data
# Add other tokens if your output_path_pattern_from_config expects them
}
# Clean None values, as generate_path_from_pattern might not handle them well for all tokens
token_data_cleaned = {k: v for k, v in token_data.items() if v is not None}
full_output_path = generate_path_from_pattern(
base_path=str(context.output_base_path), # Ensure base_path is a string
pattern=output_path_pattern,
asset_name=asset_name,
map_type="metadata", # Special map_type for metadata
filename=metadata_filename,
source_rule_name=source_rule_name,
incrementing_value=getattr(context, 'incrementing_value', None),
sha_value=sha_value # Changed from sha5_value to sha_value for more generality
# Generate the relative directory path using the pattern and tokens
relative_dir_path_str = generate_path_from_pattern(
pattern_string=output_path_pattern_from_config, # This pattern should resolve to a directory
token_data=token_data_cleaned
)
metadata_save_path = Path(full_output_path)
# Construct the full path by joining the base output path, the generated relative directory, and the final filename
metadata_save_path = Path(context.output_base_path) / Path(relative_dir_path_str) / Path(final_metadata_filename)
# C. Save Metadata File
try:
@ -109,10 +138,10 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
with open(metadata_save_path, 'w') as f:
json.dump(serializable_metadata, f, indent=4)
logger.info(f"Asset '{asset_name}': Metadata saved to {metadata_save_path}")
logger.info(f"Asset '{asset_name_for_log}': Metadata saved to {metadata_save_path}") # Use asset_name_for_log
context.asset_metadata['metadata_file_path'] = str(metadata_save_path)
except Exception as e:
logger.error(f"Asset '{asset_name}': Failed to save metadata to {metadata_save_path}. Error: {e}")
logger.error(f"Asset '{asset_name_for_log}': Failed to save metadata to {metadata_save_path}. Error: {e}") # Use asset_name_for_log
context.asset_metadata['status'] = "Failed (Metadata Save Error)"
context.status_flags['metadata_save_error'] = True

View File

@ -1,8 +1,8 @@
import datetime
import logging
from ..base_stage import ProcessingStage
from ...asset_context import AssetProcessingContext # Adjusted import path assuming asset_context is in processing.pipeline
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext # Adjusted import path assuming asset_context is in processing.pipeline
# If AssetProcessingContext is directly under 'processing', the import would be:
# from ...asset_context import AssetProcessingContext
# Based on the provided file structure, asset_context.py is in processing/pipeline/
@ -74,8 +74,6 @@ from ...asset_context import AssetProcessingContext # Adjusted import path assum
# I will use the imports that align with the provided file structure.
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
logger = logging.getLogger(__name__)
@ -97,10 +95,10 @@ class MetadataInitializationStage(ProcessingStage):
The modified AssetProcessingContext.
"""
if context.status_flags.get('skip_asset', False):
logger.debug(f"Asset '{context.asset_rule.name if context.asset_rule else 'Unknown'}': Skipping metadata initialization as 'skip_asset' is True.")
logger.debug(f"Asset '{context.asset_rule.asset_name if context.asset_rule else 'Unknown'}': Skipping metadata initialization as 'skip_asset' is True.")
return context
logger.debug(f"Asset '{context.asset_rule.name}': Initializing metadata.")
logger.debug(f"Asset '{context.asset_rule.asset_name if context.asset_rule else 'Unknown'}': Initializing metadata.")
context.asset_metadata = {}
context.processed_maps_details = {}
@ -108,12 +106,19 @@ class MetadataInitializationStage(ProcessingStage):
# Populate Initial asset_metadata
if context.asset_rule:
context.asset_metadata['asset_name'] = context.asset_rule.name
context.asset_metadata['asset_id'] = str(context.asset_rule.id)
context.asset_metadata['source_path'] = str(context.asset_rule.source_path)
context.asset_metadata['output_path_pattern'] = context.asset_rule.output_path_pattern
context.asset_metadata['tags'] = list(context.asset_rule.tags) if context.asset_rule.tags else []
context.asset_metadata['custom_fields'] = dict(context.asset_rule.custom_fields) if context.asset_rule.custom_fields else {}
context.asset_metadata['asset_name'] = context.asset_rule.asset_name
# Attempt to get 'id' from common_metadata or use asset_name as a fallback
asset_id_val = context.asset_rule.common_metadata.get('id', context.asset_rule.common_metadata.get('asset_id'))
if asset_id_val is None:
logger.warning(f"Asset '{context.asset_rule.asset_name}': No 'id' or 'asset_id' found in common_metadata. Using asset_name as asset_id.")
asset_id_val = context.asset_rule.asset_name
context.asset_metadata['asset_id'] = str(asset_id_val)
# Assuming source_path, output_path_pattern, tags, custom_fields might also be in common_metadata
context.asset_metadata['source_path'] = str(context.asset_rule.common_metadata.get('source_path', 'N/A'))
context.asset_metadata['output_path_pattern'] = context.asset_rule.common_metadata.get('output_path_pattern', 'N/A')
context.asset_metadata['tags'] = list(context.asset_rule.common_metadata.get('tags', []))
context.asset_metadata['custom_fields'] = dict(context.asset_rule.common_metadata.get('custom_fields', {}))
else:
# Handle cases where asset_rule might be None, though typically it should be set
logger.warning("AssetRule is not set in context during metadata initialization.")
@ -126,8 +131,13 @@ class MetadataInitializationStage(ProcessingStage):
if context.source_rule:
context.asset_metadata['source_rule_name'] = context.source_rule.name
context.asset_metadata['source_rule_id'] = str(context.source_rule.id)
# SourceRule also doesn't have 'name' or 'id' directly.
# Using 'input_path' as a proxy for name, and a placeholder for id.
source_rule_name_val = context.source_rule.input_path if context.source_rule.input_path else "Unknown Source Rule Path"
source_rule_id_val = context.source_rule.high_level_sorting_parameters.get('id', "N/A_SR_ID") # Check high_level_sorting_parameters
logger.debug(f"SourceRule: using input_path '{source_rule_name_val}' as name, and '{source_rule_id_val}' as id.")
context.asset_metadata['source_rule_name'] = source_rule_name_val
context.asset_metadata['source_rule_id'] = str(source_rule_id_val)
else:
logger.warning("SourceRule is not set in context during metadata initialization.")
context.asset_metadata['source_rule_name'] = "Unknown Source Rule"

View File

@ -3,11 +3,11 @@ import numpy as np
from pathlib import Path
from typing import List
from ..base_stage import ProcessingStage
from ...asset_context import AssetProcessingContext
from .....rule_structure import FileRule
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
from rule_structure import FileRule
from ...utils import image_processing_utils as ipu
from .....utils.path_utils import sanitize_filename
from utils.path_utils import sanitize_filename
logger = logging.getLogger(__name__)
@ -23,71 +23,70 @@ class NormalMapGreenChannelStage(ProcessingStage):
performs inversion if needed, saves a new temporary file, and updates
the AssetProcessingContext.
"""
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
if context.status_flags.get('skip_asset'):
logger.debug(f"Asset '{context.asset_rule.name}': Skipping NormalMapGreenChannelStage due to skip_asset flag.")
logger.debug(f"Asset '{asset_name_for_log}': Skipping NormalMapGreenChannelStage due to skip_asset flag.")
return context
if not context.files_to_process or not context.processed_maps_details:
if not context.processed_maps_details: # Check processed_maps_details primarily
logger.debug(
f"Asset '{context.asset_rule.name}': No files to process or processed_maps_details empty in NormalMapGreenChannelStage. Skipping."
f"Asset '{asset_name_for_log}': No processed_maps_details in NormalMapGreenChannelStage. Skipping."
)
return context
new_files_to_process: List[FileRule] = []
processed_a_normal_map = False
for file_rule in context.files_to_process:
if file_rule.map_type == "NORMAL":
# Iterate through processed maps, as FileRule objects don't have IDs directly
for map_id_hex, map_details in context.processed_maps_details.items():
if map_details.get('map_type') == "NORMAL" and map_details.get('status') == 'Processed':
# Check configuration for inversion
# Assuming a global setting for now.
# This key should exist in the Configuration object's general_settings.
should_invert = context.config_obj.general_settings.get('invert_normal_map_green_channel_globally', False)
# Assuming general_settings is an attribute of config_obj and might be a dict or an object
should_invert = False
if hasattr(context.config_obj, 'general_settings'):
if isinstance(context.config_obj.general_settings, dict):
should_invert = context.config_obj.general_settings.get('invert_normal_map_green_channel_globally', False)
elif hasattr(context.config_obj.general_settings, 'invert_normal_map_green_channel_globally'):
should_invert = getattr(context.config_obj.general_settings, 'invert_normal_map_green_channel_globally', False)
original_temp_path_str = map_details.get('temp_processed_file')
if not original_temp_path_str:
logger.warning(f"Asset '{asset_name_for_log}': Normal map (ID: {map_id_hex}) missing 'temp_processed_file' in details. Skipping.")
continue
original_temp_path = Path(original_temp_path_str)
original_filename_for_log = original_temp_path.name
if not should_invert:
logger.debug(
f"Asset '{context.asset_rule.name}': Normal map green channel inversion not enabled globally. "
f"Skipping for {file_rule.filename_pattern} (ID: {file_rule.id.hex})."
f"Asset '{asset_name_for_log}': Normal map green channel inversion not enabled. "
f"Skipping for {original_filename_for_log} (ID: {map_id_hex})."
)
new_files_to_process.append(file_rule)
continue
# Get the temporary processed file path
map_details = context.processed_maps_details.get(file_rule.id.hex)
if not map_details or map_details.get('status') != 'Processed' or not map_details.get('temp_processed_file'):
logger.warning(
f"Asset '{context.asset_rule.name}': Normal map {file_rule.filename_pattern} (ID: {file_rule.id.hex}) "
f"not found in processed_maps_details or not marked as 'Processed'. Cannot invert green channel."
)
new_files_to_process.append(file_rule)
continue
original_temp_path = Path(map_details['temp_processed_file'])
if not original_temp_path.exists():
logger.error(
f"Asset '{context.asset_rule.name}': Temporary file {original_temp_path} for normal map "
f"{file_rule.filename_pattern} (ID: {file_rule.id.hex}) does not exist. Cannot invert green channel."
f"Asset '{asset_name_for_log}': Temporary file {original_temp_path} for normal map "
f"{original_filename_for_log} (ID: {map_id_hex}) does not exist. Cannot invert green channel."
)
new_files_to_process.append(file_rule)
continue
image_data = ipu.load_image(original_temp_path)
if image_data is None:
logger.error(
f"Asset '{context.asset_rule.name}': Failed to load image from {original_temp_path} "
f"for normal map {file_rule.filename_pattern} (ID: {file_rule.id.hex})."
f"Asset '{asset_name_for_log}': Failed to load image from {original_temp_path} "
f"for normal map {original_filename_for_log} (ID: {map_id_hex})."
)
new_files_to_process.append(file_rule)
continue
if image_data.ndim != 3 or image_data.shape[2] < 2: # Must have at least R, G channels
logger.error(
f"Asset '{context.asset_rule.name}': Image {original_temp_path} for normal map "
f"{file_rule.filename_pattern} (ID: {file_rule.id.hex}) is not a valid RGB/normal map "
f"Asset '{asset_name_for_log}': Image {original_temp_path} for normal map "
f"{original_filename_for_log} (ID: {map_id_hex}) is not a valid RGB/normal map "
f"(ndim={image_data.ndim}, channels={image_data.shape[2] if image_data.ndim == 3 else 'N/A'}) "
f"for green channel inversion."
)
new_files_to_process.append(file_rule)
continue
# Perform Green Channel Inversion
@ -100,55 +99,55 @@ class NormalMapGreenChannelStage(ProcessingStage):
modified_image_data[:, :, 1] = max_val - modified_image_data[:, :, 1]
else:
logger.error(
f"Asset '{context.asset_rule.name}': Unsupported image data type "
f"Asset '{asset_name_for_log}': Unsupported image data type "
f"{modified_image_data.dtype} for normal map {original_temp_path}. Cannot invert green channel."
)
new_files_to_process.append(file_rule)
continue
except IndexError:
logger.error(
f"Asset '{context.asset_rule.name}': Image {original_temp_path} for normal map "
f"{file_rule.filename_pattern} (ID: {file_rule.id.hex}) does not have a green channel (index 1) "
f"Asset '{asset_name_for_log}': Image {original_temp_path} for normal map "
f"{original_filename_for_log} (ID: {map_id_hex}) does not have a green channel (index 1) "
f"or has unexpected dimensions ({modified_image_data.shape}). Cannot invert."
)
new_files_to_process.append(file_rule)
continue
# Save New Temporary (Modified Normal) Map
new_temp_filename = f"normal_g_inv_{sanitize_filename(file_rule.map_type)}_{file_rule.id.hex}{original_temp_path.suffix}"
# Sanitize map_details.get('map_type') in case it's missing, though it should be 'NORMAL' here
map_type_for_filename = sanitize_filename(map_details.get('map_type', 'NORMAL'))
new_temp_filename = f"normal_g_inv_{map_type_for_filename}_{map_id_hex}{original_temp_path.suffix}"
new_temp_path = context.engine_temp_dir / new_temp_filename
save_success = ipu.save_image(new_temp_path, modified_image_data)
if save_success:
logger.info(
f"Asset '{context.asset_rule.name}': Inverted green channel for NORMAL map "
f"{original_temp_path.name}, saved to {new_temp_path.name}."
f"Asset '{asset_name_for_log}': Inverted green channel for NORMAL map "
f"{original_filename_for_log}, saved to {new_temp_path.name}."
)
# Update processed_maps_details
context.processed_maps_details[file_rule.id.hex]['temp_processed_file'] = str(new_temp_path)
current_notes = context.processed_maps_details[file_rule.id.hex].get('notes', '')
context.processed_maps_details[file_rule.id.hex]['notes'] = \
# Update processed_maps_details for this map_id_hex
context.processed_maps_details[map_id_hex]['temp_processed_file'] = str(new_temp_path)
current_notes = context.processed_maps_details[map_id_hex].get('notes', '')
context.processed_maps_details[map_id_hex]['notes'] = \
f"{current_notes}; Green channel inverted by NormalMapGreenChannelStage".strip('; ')
new_files_to_process.append(file_rule) # Add original rule, it now points to modified data
processed_a_normal_map = True
else:
logger.error(
f"Asset '{context.asset_rule.name}': Failed to save inverted normal map to {new_temp_path} "
f"for original {original_temp_path.name}."
f"Asset '{asset_name_for_log}': Failed to save inverted normal map to {new_temp_path} "
f"for original {original_filename_for_log}."
)
new_files_to_process.append(file_rule) # Add original rule, as processing failed
else:
# Not a normal map, just pass it through
new_files_to_process.append(file_rule)
# No need to explicitly manage new_files_to_process list in this loop,
# as we are modifying the temp_processed_file path within processed_maps_details.
# The existing FileRule objects in context.files_to_process (if any) would
# be linked to these details by a previous stage (e.g. IndividualMapProcessing)
# if that stage populates a 'file_rule_id' in map_details.
context.files_to_process = new_files_to_process
# context.files_to_process remains unchanged by this stage directly,
# as we modify the data pointed to by processed_maps_details.
if processed_a_normal_map:
logger.info(f"Asset '{context.asset_rule.name}': NormalMapGreenChannelStage processed relevant normal maps.")
logger.info(f"Asset '{asset_name_for_log}': NormalMapGreenChannelStage processed relevant normal maps.")
else:
logger.debug(f"Asset '{context.asset_rule.name}': No normal maps found or processed in NormalMapGreenChannelStage.")
logger.debug(f"Asset '{asset_name_for_log}': No normal maps found or processed in NormalMapGreenChannelStage.")
return context

View File

@ -3,10 +3,10 @@ import shutil
from pathlib import Path
from typing import List, Dict, Optional
from ..base_stage import ProcessingStage
from ...asset_context import AssetProcessingContext
from ....utils.path_utils import generate_path_from_pattern, sanitize_filename
from ....config import FileRule, MergeRule # Assuming these are needed for type hints if not directly in context
from .base_stage import ProcessingStage
from ..asset_context import AssetProcessingContext
from utils.path_utils import generate_path_from_pattern, sanitize_filename
from rule_structure import FileRule # Assuming these are needed for type hints if not directly in context
logger = logging.getLogger(__name__)
@ -21,135 +21,218 @@ class OutputOrganizationStage(ProcessingStage):
Copies temporary processed and merged files to their final output locations
based on path patterns and updates AssetProcessingContext.
"""
logger.debug(f"Asset '{context.asset_rule.name}': Starting output organization stage.")
asset_name_for_log = context.asset_rule.asset_name if hasattr(context, 'asset_rule') and context.asset_rule else "Unknown Asset"
logger.debug(f"Asset '{asset_name_for_log}': Starting output organization stage.")
if context.status_flags.get('skip_asset'):
logger.info(f"Asset '{context.asset_rule.name}': Output organization skipped as 'skip_asset' is True.")
logger.info(f"Asset '{asset_name_for_log}': Output organization skipped as 'skip_asset' is True.")
return context
current_status = context.asset_metadata.get('status', '')
if current_status.startswith("Failed") or current_status == "Skipped":
logger.info(f"Asset '{context.asset_rule.name}': Output organization skipped due to prior status: {current_status}.")
logger.info(f"Asset '{asset_name_for_log}': Output organization skipped due to prior status: {current_status}.")
return context
final_output_files: List[str] = []
# Ensure config_obj and general_settings are present, provide default for overwrite_existing if not
overwrite_existing = False
if context.config_obj and hasattr(context.config_obj, 'general_settings'):
overwrite_existing = context.config_obj.general_settings.overwrite_existing
# Correctly access general_settings and overwrite_existing from config_obj
if hasattr(context.config_obj, 'general_settings'):
if isinstance(context.config_obj.general_settings, dict):
overwrite_existing = context.config_obj.general_settings.get('overwrite_existing', False)
elif hasattr(context.config_obj.general_settings, 'overwrite_existing'): # If general_settings is an object
overwrite_existing = getattr(context.config_obj.general_settings, 'overwrite_existing', False)
else:
logger.warning(f"Asset '{context.asset_rule.name}': config_obj.general_settings not found, defaulting overwrite_existing to False.")
logger.warning(f"Asset '{asset_name_for_log}': config_obj.general_settings not found, defaulting overwrite_existing to False.")
output_dir_pattern = getattr(context.config_obj, 'output_directory_pattern', "[supplier]/[assetname]")
output_filename_pattern_config = getattr(context.config_obj, 'output_filename_pattern', "[assetname]_[maptype]_[resolution].[ext]")
# A. Organize Processed Individual Maps
if context.processed_maps_details:
logger.debug(f"Asset '{context.asset_rule.name}': Organizing {len(context.processed_maps_details)} processed individual map(s).")
for file_rule_id, details in context.processed_maps_details.items():
logger.debug(f"Asset '{asset_name_for_log}': Organizing {len(context.processed_maps_details)} processed individual map(s).")
for processed_map_key, details in context.processed_maps_details.items(): # Use processed_map_key
if details.get('status') != 'Processed' or not details.get('temp_processed_file'):
logger.debug(f"Asset '{context.asset_rule.name}': Skipping file_rule_id '{file_rule_id}' due to status '{details.get('status')}' or missing temp file.")
logger.debug(f"Asset '{asset_name_for_log}': Skipping processed map key '{processed_map_key}' due to status '{details.get('status')}' or missing temp file.")
continue
temp_file_path = Path(details['temp_processed_file'])
map_type = details['map_type']
map_type = details.get('map_type', 'unknown_map_type')
resolution_str = details.get('processed_resolution_name', details.get('original_resolution_name', 'resX'))
output_filename = f"{context.asset_rule.name}_{sanitize_filename(map_type)}{temp_file_path.suffix}"
if context.asset_rule and context.asset_rule.file_rules:
current_file_rule: Optional[FileRule] = next(
(fr for fr in context.asset_rule.file_rules if fr.id == file_rule_id), None
)
if current_file_rule and current_file_rule.output_filename_pattern:
output_filename = current_file_rule.output_filename_pattern
# Construct token_data for path generation
token_data = {
"assetname": asset_name_for_log,
"supplier": context.effective_supplier or "DefaultSupplier",
"maptype": map_type,
"resolution": resolution_str,
"ext": temp_file_path.suffix.lstrip('.'), # Get extension without dot
"incrementingvalue": getattr(context, 'incrementing_value', None),
"sha5": getattr(context, 'sha5_value', None)
}
token_data_cleaned = {k: v for k, v in token_data.items() if v is not None}
# Generate filename first using its pattern
# output_filename = f"{asset_name_for_log}_{sanitize_filename(map_type)}{temp_file_path.suffix}" # Old way
output_filename = generate_path_from_pattern(output_filename_pattern_config, token_data_cleaned)
try:
final_path_str = generate_path_from_pattern(
base_path=str(context.output_base_path),
pattern=context.asset_rule.output_path_pattern,
asset_name=context.asset_rule.name,
map_type=map_type,
filename=output_filename,
source_rule_name=context.source_rule.name if context.source_rule else "DefaultSource",
incrementing_value=str(context.incrementing_value) if context.incrementing_value is not None else None,
sha5_value=context.sha5_value
relative_dir_path_str = generate_path_from_pattern(
pattern_string=output_dir_pattern,
token_data=token_data_cleaned
)
final_path = Path(final_path_str)
final_path = Path(context.output_base_path) / Path(relative_dir_path_str) / Path(output_filename)
final_path.parent.mkdir(parents=True, exist_ok=True)
if final_path.exists() and not overwrite_existing:
logger.info(f"Asset '{context.asset_rule.name}': Output file {final_path} exists and overwrite is disabled. Skipping copy.")
logger.info(f"Asset '{asset_name_for_log}': Output file {final_path} exists and overwrite is disabled. Skipping copy.")
else:
shutil.copy2(temp_file_path, final_path)
logger.info(f"Asset '{context.asset_rule.name}': Copied {temp_file_path} to {final_path}")
logger.info(f"Asset '{asset_name_for_log}': Copied {temp_file_path} to {final_path}")
final_output_files.append(str(final_path))
context.processed_maps_details[file_rule_id]['final_output_path'] = str(final_path)
context.processed_maps_details[file_rule_id]['status'] = 'Organized' # Or some other status indicating completion
context.processed_maps_details[processed_map_key]['final_output_path'] = str(final_path)
context.processed_maps_details[processed_map_key]['status'] = 'Organized'
except Exception as e:
logger.error(f"Asset '{context.asset_rule.name}': Failed to copy {temp_file_path} to {final_path_str if 'final_path_str' in locals() else 'unknown destination'} for file_rule_id '{file_rule_id}'. Error: {e}", exc_info=True)
logger.error(f"Asset '{asset_name_for_log}': Failed to copy {temp_file_path} to destination for processed map key '{processed_map_key}'. Error: {e}", exc_info=True)
context.status_flags['output_organization_error'] = True
context.asset_metadata['status'] = "Failed (Output Organization Error)"
# Optionally update status in details as well
context.processed_maps_details[file_rule_id]['status'] = 'Organization Failed'
context.processed_maps_details[processed_map_key]['status'] = 'Organization Failed'
else:
logger.debug(f"Asset '{context.asset_rule.name}': No processed individual maps to organize.")
logger.debug(f"Asset '{asset_name_for_log}': No processed individual maps to organize.")
# B. Organize Merged Maps
if context.merged_maps_details:
logger.debug(f"Asset '{context.asset_rule.name}': Organizing {len(context.merged_maps_details)} merged map(s).")
for merge_rule_id, details in context.merged_maps_details.items():
logger.debug(f"Asset '{asset_name_for_log}': Organizing {len(context.merged_maps_details)} merged map(s).")
for merge_op_id, details in context.merged_maps_details.items(): # Use merge_op_id
if details.get('status') != 'Processed' or not details.get('temp_merged_file'):
logger.debug(f"Asset '{context.asset_rule.name}': Skipping merge_rule_id '{merge_rule_id}' due to status '{details.get('status')}' or missing temp file.")
logger.debug(f"Asset '{asset_name_for_log}': Skipping merge op id '{merge_op_id}' due to status '{details.get('status')}' or missing temp file.")
continue
temp_file_path = Path(details['temp_merged_file'])
map_type = details['map_type'] # This is the output_map_type of the merge rule
map_type = details.get('map_type', 'unknown_merged_map') # This is the output_map_type of the merge rule
# Merged maps might not have a simple 'resolution' token like individual maps.
# We'll use a placeholder or derive if possible.
resolution_str = details.get('merged_resolution_name', 'mergedRes')
output_filename = f"{context.asset_rule.name}_{sanitize_filename(map_type)}{temp_file_path.suffix}"
if context.asset_rule and context.asset_rule.merge_rules:
current_merge_rule: Optional[MergeRule] = next(
(mr for mr in context.asset_rule.merge_rules if mr.id == merge_rule_id), None
)
if current_merge_rule and current_merge_rule.output_filename_pattern:
output_filename = current_merge_rule.output_filename_pattern
token_data_merged = {
"assetname": asset_name_for_log,
"supplier": context.effective_supplier or "DefaultSupplier",
"maptype": map_type,
"resolution": resolution_str,
"ext": temp_file_path.suffix.lstrip('.'),
"incrementingvalue": getattr(context, 'incrementing_value', None),
"sha5": getattr(context, 'sha5_value', None)
}
token_data_merged_cleaned = {k: v for k, v in token_data_merged.items() if v is not None}
output_filename_merged = generate_path_from_pattern(output_filename_pattern_config, token_data_merged_cleaned)
try:
final_path_str = generate_path_from_pattern(
base_path=str(context.output_base_path),
pattern=context.asset_rule.output_path_pattern,
asset_name=context.asset_rule.name,
map_type=map_type,
filename=output_filename,
source_rule_name=context.source_rule.name if context.source_rule else "DefaultSource",
incrementing_value=str(context.incrementing_value) if context.incrementing_value is not None else None,
sha5_value=context.sha5_value
relative_dir_path_str_merged = generate_path_from_pattern(
pattern_string=output_dir_pattern,
token_data=token_data_merged_cleaned
)
final_path = Path(final_path_str)
final_path.parent.mkdir(parents=True, exist_ok=True)
final_path_merged = Path(context.output_base_path) / Path(relative_dir_path_str_merged) / Path(output_filename_merged)
final_path_merged.parent.mkdir(parents=True, exist_ok=True)
if final_path.exists() and not overwrite_existing:
logger.info(f"Asset '{context.asset_rule.name}': Output file {final_path} exists and overwrite is disabled. Skipping copy for merged map.")
if final_path_merged.exists() and not overwrite_existing:
logger.info(f"Asset '{asset_name_for_log}': Output file {final_path_merged} exists and overwrite is disabled. Skipping copy for merged map.")
else:
shutil.copy2(temp_file_path, final_path)
logger.info(f"Asset '{context.asset_rule.name}': Copied merged map {temp_file_path} to {final_path}")
final_output_files.append(str(final_path))
shutil.copy2(temp_file_path, final_path_merged)
logger.info(f"Asset '{asset_name_for_log}': Copied merged map {temp_file_path} to {final_path_merged}")
final_output_files.append(str(final_path_merged))
context.merged_maps_details[merge_rule_id]['final_output_path'] = str(final_path)
context.merged_maps_details[merge_rule_id]['status'] = 'Organized'
context.merged_maps_details[merge_op_id]['final_output_path'] = str(final_path_merged)
context.merged_maps_details[merge_op_id]['status'] = 'Organized'
except Exception as e:
logger.error(f"Asset '{context.asset_rule.name}': Failed to copy merged map {temp_file_path} to {final_path_str if 'final_path_str' in locals() else 'unknown destination'} for merge_rule_id '{merge_rule_id}'. Error: {e}", exc_info=True)
logger.error(f"Asset '{asset_name_for_log}': Failed to copy merged map {temp_file_path} to destination for merge op id '{merge_op_id}'. Error: {e}", exc_info=True)
context.status_flags['output_organization_error'] = True
context.asset_metadata['status'] = "Failed (Output Organization Error)"
context.merged_maps_details[merge_rule_id]['status'] = 'Organization Failed'
context.merged_maps_details[merge_op_id]['status'] = 'Organization Failed'
else:
logger.debug(f"Asset '{context.asset_rule.name}': No merged maps to organize.")
logger.debug(f"Asset '{asset_name_for_log}': No merged maps to organize.")
# C. Organize Extra Files (e.g., previews, text files)
logger.debug(f"Asset '{asset_name_for_log}': Checking for EXTRA files to organize.")
extra_files_organized_count = 0
if hasattr(context, 'files_to_process') and context.files_to_process:
extra_subdir_name = getattr(context.config_obj, 'extra_files_subdir', 'Extra') # Default to 'Extra'
for file_rule in context.files_to_process:
if file_rule.item_type == 'EXTRA':
source_file_path = context.workspace_path / file_rule.file_path
if not source_file_path.is_file():
logger.warning(f"Asset '{asset_name_for_log}': EXTRA file '{source_file_path}' not found. Skipping.")
continue
# Basic token data for the asset's base output directory
# We don't use map_type, resolution, or ext for the base directory of extras.
# However, generate_path_from_pattern might expect them or handle their absence.
# For the base asset directory, only assetname and supplier are typically primary.
base_token_data = {
"assetname": asset_name_for_log,
"supplier": context.effective_supplier or "DefaultSupplier",
# Add other tokens if your output_directory_pattern uses them at the asset level
"incrementingvalue": getattr(context, 'incrementing_value', None),
"sha5": getattr(context, 'sha5_value', None)
}
base_token_data_cleaned = {k: v for k, v in base_token_data.items() if v is not None}
try:
asset_base_output_dir_str = generate_path_from_pattern(
pattern_string=output_dir_pattern, # Uses the same pattern as other maps for base dir
token_data=base_token_data_cleaned
)
# Destination: <output_base_path>/<asset_base_output_dir_str>/<extra_subdir_name>/<original_filename>
final_dest_path = (Path(context.output_base_path) /
Path(asset_base_output_dir_str) /
Path(extra_subdir_name) /
source_file_path.name) # Use original filename
final_dest_path.parent.mkdir(parents=True, exist_ok=True)
if final_dest_path.exists() and not overwrite_existing:
logger.info(f"Asset '{asset_name_for_log}': EXTRA file destination {final_dest_path} exists and overwrite is disabled. Skipping copy.")
else:
shutil.copy2(source_file_path, final_dest_path)
logger.info(f"Asset '{asset_name_for_log}': Copied EXTRA file {source_file_path} to {final_dest_path}")
final_output_files.append(str(final_dest_path))
extra_files_organized_count += 1
# Optionally, add more detailed tracking for extra files in context.asset_metadata
# For example:
# if 'extra_files_details' not in context.asset_metadata:
# context.asset_metadata['extra_files_details'] = []
# context.asset_metadata['extra_files_details'].append({
# 'source_path': str(source_file_path),
# 'destination_path': str(final_dest_path),
# 'status': 'Organized'
# })
except Exception as e:
logger.error(f"Asset '{asset_name_for_log}': Failed to copy EXTRA file {source_file_path} to destination. Error: {e}", exc_info=True)
context.status_flags['output_organization_error'] = True
context.asset_metadata['status'] = "Failed (Output Organization Error - Extra Files)"
# Optionally, update status for the specific file_rule if tracked
if extra_files_organized_count > 0:
logger.info(f"Asset '{asset_name_for_log}': Successfully organized {extra_files_organized_count} EXTRA file(s).")
else:
logger.debug(f"Asset '{asset_name_for_log}': No EXTRA files were processed or found to organize.")
context.asset_metadata['final_output_files'] = final_output_files
if context.status_flags.get('output_organization_error'):
logger.error(f"Asset '{context.asset_rule.name}': Output organization encountered errors. Status: {context.asset_metadata['status']}")
logger.error(f"Asset '{asset_name_for_log}': Output organization encountered errors. Status: {context.asset_metadata['status']}")
else:
logger.info(f"Asset '{context.asset_rule.name}': Output organization complete. {len(final_output_files)} files placed.")
logger.info(f"Asset '{asset_name_for_log}': Output organization complete. {len(final_output_files)} files placed.")
logger.debug(f"Asset '{context.asset_rule.name}': Output organization stage finished.")
logger.debug(f"Asset '{asset_name_for_log}': Output organization stage finished.")
return context

View File

@ -20,29 +20,29 @@ class SupplierDeterminationStage(ProcessingStage):
"""
effective_supplier = None
logger = logging.getLogger(__name__) # Using a logger specific to this module
asset_name_for_log = context.asset_rule.asset_name if context.asset_rule else "Unknown Asset"
# 1. Check asset_rule.supplier_override
if context.asset_rule and context.asset_rule.supplier_override:
effective_supplier = context.asset_rule.supplier_override
logger.debug(f"Asset '{context.asset_rule.name}': Supplier override found: '{effective_supplier}'.")
# 2. If not overridden, check source_rule.supplier
if not effective_supplier and context.source_rule and context.source_rule.supplier:
effective_supplier = context.source_rule.supplier
logger.debug(f"Asset '{context.asset_rule.name if context.asset_rule else 'Unknown'}': Source rule supplier found: '{effective_supplier}'.")
# 1. Check source_rule.supplier_override (highest precedence)
if context.source_rule and context.source_rule.supplier_override:
effective_supplier = context.source_rule.supplier_override
logger.debug(f"Asset '{asset_name_for_log}': Supplier override from source_rule found: '{effective_supplier}'.")
# 2. If not overridden, check source_rule.supplier_identifier
elif context.source_rule and context.source_rule.supplier_identifier:
effective_supplier = context.source_rule.supplier_identifier
logger.debug(f"Asset '{asset_name_for_log}': Supplier identifier from source_rule found: '{effective_supplier}'.")
# 3. Validation
if not effective_supplier:
asset_name = context.asset_rule.name if context.asset_rule else "Unknown Asset"
logger.error(f"Asset '{asset_name}': No supplier defined in asset rule or source rule.")
logger.error(f"Asset '{asset_name_for_log}': No supplier defined in source_rule (override or identifier).")
context.effective_supplier = None
if 'status_flags' not in context: # Ensure status_flags exists
context.status_flags = {}
context.status_flags['supplier_error'] = True
elif context.config_obj and effective_supplier not in context.config_obj.suppliers:
asset_name = context.asset_rule.name if context.asset_rule else "Unknown Asset"
# Assuming context.config_obj.suppliers is a valid way to get the list of configured suppliers.
# This might need further investigation if errors occur here later.
elif context.config_obj and hasattr(context.config_obj, 'suppliers') and effective_supplier not in context.config_obj.suppliers:
logger.warning(
f"Asset '{asset_name}': Supplier '{effective_supplier}' not found in global supplier configuration. "
f"Asset '{asset_name_for_log}': Determined supplier '{effective_supplier}' not found in global supplier configuration. "
f"Available: {list(context.config_obj.suppliers.keys()) if context.config_obj.suppliers else 'None'}"
)
context.effective_supplier = None
@ -51,11 +51,10 @@ class SupplierDeterminationStage(ProcessingStage):
context.status_flags['supplier_error'] = True
else:
context.effective_supplier = effective_supplier
asset_name = context.asset_rule.name if context.asset_rule else "Unknown Asset"
logger.info(f"Asset '{asset_name}': Effective supplier set to '{effective_supplier}'.")
# Optionally clear the error flag if previously set and now resolved, though current logic doesn't show this path.
# if 'status_flags' in context and 'supplier_error' in context.status_flags:
# del context.status_flags['supplier_error']
logger.info(f"Asset '{asset_name_for_log}': Effective supplier set to '{effective_supplier}'.")
# Optionally clear the error flag if previously set and now resolved.
if 'supplier_error' in context.status_flags:
del context.status_flags['supplier_error']
return context

View File

@ -25,6 +25,23 @@ def get_nearest_pot(value: int) -> int:
else:
return upper_pot
def get_nearest_power_of_two_downscale(value: int) -> int:
"""
Finds the nearest power of two that is less than or equal to the given value.
If the value is already a power of two, it returns the value itself.
Returns 1 if the value is less than 1.
"""
if value &lt; 1:
return 1
if is_power_of_two(value):
return value
# Find the largest power of two strictly less than value,
# unless value itself is POT.
# (1 &lt;&lt; (value.bit_length() - 1)) achieves this.
# Example: value=7 (0111, bl=3), 1&lt;&lt;2 = 4.
# Example: value=8 (1000, bl=4), 1&lt;&lt;3 = 8.
# Example: value=9 (1001, bl=4), 1&lt;&lt;3 = 8.
return 1 &lt;&lt; (value.bit_length() - 1)
# --- Dimension Calculation ---
def calculate_target_dimensions(

View File

@ -12,7 +12,8 @@ from typing import List, Dict, Tuple, Optional, Set
try:
import cv2
import numpy as np
except ImportError:
except ImportError as e:
log.error(f"Failed to import cv2 or numpy in processing_engine.py: {e}", exc_info=True)
print("ERROR: Missing required image processing libraries. Please install opencv-python and numpy:")
print("pip install opencv-python numpy")
# Allow import to fail but log error; execution will likely fail later
@ -25,8 +26,11 @@ try:
from configuration import Configuration, ConfigurationError
from rule_structure import SourceRule, AssetRule, FileRule
from utils.path_utils import generate_path_from_pattern, sanitize_filename
from utils import image_processing_utils as ipu # Added import
except ImportError:
from processing.utils import image_processing_utils as ipu # Corrected import
except ImportError as e:
# Temporarily print to console as log might not be initialized yet
print(f"ERROR during initial imports in processing_engine.py: {e}")
# log.error(f"Failed to import Configuration or rule_structure classes in processing_engine.py: {e}", exc_info=True) # Log will be used after init
print("ERROR: Cannot import Configuration or rule_structure classes.")
print("Ensure configuration.py and rule_structure.py are in the same directory or Python path.")
# Allow import to fail but log error; execution will likely fail later
@ -36,6 +40,12 @@ except ImportError:
FileRule = None
# Initialize logger early
log = logging.getLogger(__name__)
# Basic config if logger hasn't been set up elsewhere (e.g., during testing)
if not log.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
# Use logger defined in main.py (or configure one here if run standalone)
from processing.pipeline.orchestrator import PipelineOrchestrator
@ -51,11 +61,6 @@ from processing.pipeline.stages.individual_map_processing import IndividualMapPr
from processing.pipeline.stages.map_merging import MapMergingStage
from processing.pipeline.stages.metadata_finalization_save import MetadataFinalizationAndSaveStage
from processing.pipeline.stages.output_organization import OutputOrganizationStage
log = logging.getLogger(__name__)
# Basic config if logger hasn't been set up elsewhere (e.g., during testing)
if not log.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
# --- Custom Exception ---
class ProcessingEngineError(Exception):

View File

@ -6,7 +6,7 @@ import numpy as np
from processing.pipeline.stages.alpha_extraction_to_mask import AlphaExtractionToMaskStage
from processing.pipeline.asset_context import AssetProcessingContext
from rule_structure import AssetRule, SourceRule, FileRule, TransformSettings
from rule_structure import AssetRule, SourceRule, FileRule
from configuration import Configuration, GeneralSettings
import processing.utils.image_processing_utils as ipu # Ensure ipu is available for mocking

View File

@ -7,7 +7,7 @@ from typing import Optional # Added for type hinting in helper functions
from processing.pipeline.stages.individual_map_processing import IndividualMapProcessingStage
from processing.pipeline.asset_context import AssetProcessingContext
from rule_structure import AssetRule, SourceRule, FileRule, TransformSettings # Key models
from rule_structure import AssetRule, SourceRule, FileRule # Key models
from configuration import Configuration, GeneralSettings
# cv2 might be imported by the stage for interpolation constants, ensure it's mockable if so.
# For now, assume ipu handles interpolation details.

View File

@ -7,7 +7,7 @@ from typing import Optional # Added Optional for type hinting
from processing.pipeline.stages.map_merging import MapMergingStage
from processing.pipeline.asset_context import AssetProcessingContext
from rule_structure import AssetRule, SourceRule, FileRule, MergeSettings, MergeInputChannel
from rule_structure import AssetRule, SourceRule, FileRule
from configuration import Configuration
# Mock Helper Functions