# gui/prediction_handler.py import logging from pathlib import Path import time import os import re # Import regex import tempfile # Added for temporary extraction directory import zipfile # Added for zip file handling # import patoolib # Potential import for rar/7z - Add later if zip works from collections import defaultdict from typing import List, Dict, Any # For type hinting # --- PySide6 Imports --- from PySide6.QtCore import QObject, Signal, QThread, Slot # --- Backend Imports --- import sys script_dir = Path(__file__).parent project_root = script_dir.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) try: from configuration import Configuration, ConfigurationError, load_base_config # Import Configuration, ConfigurationError, and load_base_config # AssetProcessor might not be needed directly anymore if logic is moved here # from asset_processor import AssetProcessor, AssetProcessingError from rule_structure import SourceRule, AssetRule, FileRule # Removed AssetType, ItemType # Removed: import config as app_config # Import project's config module # Removed: Import the new dictionaries directly for easier access # Removed: from config import ASSET_TYPE_DEFINITIONS, FILE_TYPE_DEFINITIONS BACKEND_AVAILABLE = True except ImportError as e: print(f"ERROR (PredictionHandler): Failed to import backend/config modules: {e}") # Define placeholders if imports fail Configuration = None load_base_config = None # Placeholder ConfigurationError = Exception # AssetProcessingError = Exception SourceRule, AssetRule, FileRule = (None,)*3 # Placeholder for rule structures # Removed: AssetType, ItemType = (None,)*2 # Placeholder for types # Removed: app_config = None # Placeholder for config BACKEND_AVAILABLE = False log = logging.getLogger(__name__) # Basic config if logger hasn't been set up elsewhere if not log.hasHandlers(): logging.basicConfig(level=logging.INFO, format='%(levelname)s (PredictHandler): %(message)s') # Helper function for classification (can be moved outside class if preferred) def classify_files(file_list: List[str], config: Configuration) -> Dict[str, List[Dict[str, Any]]]: """ Analyzes a list of files based on configuration rules to group them by asset and determine initial file properties. Args: file_list: List of absolute file paths. config: The loaded Configuration object containing naming rules. Returns: A dictionary grouping file information by predicted asset name. Example: { 'AssetName1': [ {'file_path': '/path/to/AssetName1_Color.png', 'item_type': 'Color', 'asset_name': 'AssetName1'}, {'file_path': '/path/to/AssetName1_Normal.png', 'item_type': 'Normal', 'asset_name': 'AssetName1'} ], # ... other assets } Returns an empty dict if classification fails or no files are provided. """ temp_grouped_files = defaultdict(list) extra_files_to_associate = [] # Store tuples: (file_path_str, filename) primary_asset_names = set() # Store asset names derived from map files # --- Validation --- if not file_list or not config: log.warning("Classification skipped: Missing file list or config.") return {} # Access compiled regex directly from the config object if not hasattr(config, 'compiled_map_keyword_regex') or not config.compiled_map_keyword_regex: log.warning("Classification skipped: Missing compiled map keyword regex in config.") # Don't return yet, might still find extras if not hasattr(config, 'compiled_extra_regex'): log.warning("Configuration object missing 'compiled_extra_regex'. Cannot classify extra files.") # Continue, but extras won't be found compiled_map_regex = getattr(config, 'compiled_map_keyword_regex', {}) compiled_extra_regex = getattr(config, 'compiled_extra_regex', []) num_map_rules = sum(len(patterns) for patterns in compiled_map_regex.values()) num_extra_rules = len(compiled_extra_regex) log.debug(f"Starting classification for {len(file_list)} files using {num_map_rules} map keyword patterns and {num_extra_rules} extra patterns.") # --- Initial Pass: Classify Maps and Identify Extras --- for file_path_str in file_list: file_path = Path(file_path_str) filename = file_path.name is_extra = False is_map = False # 1. Check for Extra Files FIRST for extra_pattern in compiled_extra_regex: if extra_pattern.search(filename): log.debug(f"File '{filename}' matched EXTRA pattern: {extra_pattern.pattern}") extra_files_to_associate.append((file_path_str, filename)) is_extra = True break # Stop checking extra patterns for this file if is_extra: continue # Move to the next file if it's an extra # 2. Check for Map Files # TODO: Consider rule priority if multiple patterns match the same file for target_type, patterns_list in compiled_map_regex.items(): for compiled_regex, original_keyword, rule_index in patterns_list: match = compiled_regex.search(filename) if match: # --- DEBUG LOG: Inspect available rule info --- log.debug(f" Match found! Rule Index: {rule_index}, Original Keyword: '{original_keyword}', Target Type: '{target_type}'") # Access the full rule details directly from the config's map_type_mapping list using the index matched_rule_details = None try: # Access map_type_mapping using the property map_type_mapping_list = config.map_type_mapping # Use the property matched_rule_details = map_type_mapping_list[rule_index] # Access rule by index is_gloss_flag = matched_rule_details.get('is_gloss_source', False) # Get flag or default False log.debug(f" Associated rule details: {matched_rule_details}") log.debug(f" 'is_gloss_source' flag from rule: {is_gloss_flag}") except IndexError: log.warning(f" Could not access map_type_mapping rule at index {rule_index} in config.settings. Cannot determine 'is_gloss_source' flag.") is_gloss_flag = False # Default if rule cannot be accessed # --- End DEBUG LOG --- matched_item_type = target_type # The standard type (e.g., MAP_COL) asset_name = None # --- Asset Name Extraction Logic (Simplified Heuristic) --- match_start_index = match.start(1) if match_start_index > 0: potential_name = filename[:match_start_index].rstrip('_- .') asset_name = potential_name if potential_name else file_path.stem else: asset_name = file_path.stem if not asset_name: asset_name = file_path.stem log.debug(f"File '{filename}' matched keyword '{original_keyword}' (rule {rule_index}) for item_type '{matched_item_type}'. Assigned asset name: '{asset_name}'") temp_grouped_files[asset_name].append({ 'file_path': file_path_str, 'item_type': matched_item_type, 'asset_name': asset_name, # --- Store the flag retrieved from the rule --- 'is_gloss_source': is_gloss_flag # Store the boolean value obtained above }) primary_asset_names.add(asset_name) # Mark this as a primary asset name is_map = True break # Stop checking patterns for this file if is_map: break # Stop checking target types for this file # 3. Handle Unmatched Files (Not Extra, Not Map) if not is_extra and not is_map: log.debug(f"File '{filename}' did not match any map/extra pattern. Grouping by stem as FILE_IGNORE.") asset_name = file_path.stem temp_grouped_files[asset_name].append({ 'file_path': file_path_str, 'item_type': "FILE_IGNORE", 'asset_name': asset_name }) # --- Determine Primary Asset Name --- # Simple heuristic: if only one name derived from maps, use it. Otherwise, log warning. final_primary_asset_name = None if len(primary_asset_names) == 1: final_primary_asset_name = list(primary_asset_names)[0] log.debug(f"Determined single primary asset name: '{final_primary_asset_name}'") elif len(primary_asset_names) > 1: # TODO: Implement a better heuristic for multiple assets (e.g., longest common prefix) final_primary_asset_name = list(primary_asset_names)[0] # Fallback: use the first one found log.warning(f"Multiple potential primary asset names found: {primary_asset_names}. Using '{final_primary_asset_name}' for associating extra files. Consider refining asset name extraction.") else: # No maps found, but maybe extras exist? Associate with the first asset group found. if temp_grouped_files and extra_files_to_associate: final_primary_asset_name = list(temp_grouped_files.keys())[0] log.warning(f"No map files found to determine primary asset name. Associating extras with first group found: '{final_primary_asset_name}'.") else: log.debug("No primary asset name determined (no maps found).") # --- Associate Extra Files --- if final_primary_asset_name and extra_files_to_associate: log.debug(f"Associating {len(extra_files_to_associate)} extra file(s) with primary asset '{final_primary_asset_name}'") for file_path_str, filename in extra_files_to_associate: temp_grouped_files[final_primary_asset_name].append({ 'file_path': file_path_str, 'item_type': "EXTRA", # Assign specific type 'asset_name': final_primary_asset_name # Associate with primary asset }) elif extra_files_to_associate: log.warning(f"Could not determine a primary asset name to associate {len(extra_files_to_associate)} extra file(s) with. They will be ignored.") # Optionally, create a separate 'Extras' asset group? # for file_path_str, filename in extra_files_to_associate: # temp_grouped_files["_Extras_"].append(...) log.debug(f"Classification complete. Found {len(temp_grouped_files)} potential assets.") return dict(temp_grouped_files) class PredictionHandler(QObject): """ Handles running predictions in a separate thread to avoid GUI freezes. Generates the initial SourceRule hierarchy based on file lists and presets. """ # --- Signals --- # Emitted when the hierarchical rule structure is ready for a single source rule_hierarchy_ready = Signal(list) # Emits a LIST containing ONE SourceRule object # Emitted when prediction/hierarchy generation for a source is done (emits the input_source_identifier) prediction_finished = Signal(str) # Emitted for status updates status_message = Signal(str, int) def __init__(self, parent=None): super().__init__(parent) self._is_running = False @property def is_running(self): return self._is_running # Removed _predict_single_asset method @Slot(str, list, str) # Explicitly define types for the slot def run_prediction(self, input_source_identifier: str, original_input_paths: list[str], preset_name: str): """ Generates the initial SourceRule hierarchy for a given source identifier (which could be a folder or archive path), extracting the actual file list first. file list, and preset name. Populates only overridable fields based on classification and preset defaults. This method is intended to be run in a separate QThread. """ thread_id = QThread.currentThread() log.info(f"[{time.time():.4f}][T:{thread_id}] --> Entered PredictionHandler.run_prediction.") # Note: file_list argument is renamed to original_input_paths for clarity, # but the signal passes the list of source paths, not the content files yet. # We use input_source_identifier as the primary path to analyze. log.info(f"VERIFY: PredictionHandler received request. Source: '{input_source_identifier}', Original Paths: {original_input_paths}, Preset: '{preset_name}'") # DEBUG Verify log.info(f"Source Identifier: '{input_source_identifier}', Preset: '{preset_name}'") if self._is_running: log.warning("Prediction is already running for another source. Aborting this run.") # Don't emit finished, let the running one complete. return if not BACKEND_AVAILABLE: log.error("Backend/config modules not available. Cannot run prediction.") self.status_message.emit("Error: Backend components missing.", 5000) # self.prediction_finished.emit() # Don't emit finished if never started properly return if not preset_name: log.warning("No preset selected for prediction.") self.status_message.emit("No preset selected.", 3000) # self.prediction_finished.emit() return # Check the identifier path itself source_path = Path(input_source_identifier) if not source_path.exists(): log.warning(f"Input source path does not exist: '{input_source_identifier}'. Skipping prediction.") self.status_message.emit("Input path not found.", 3000) self.rule_hierarchy_ready.emit([]) self.prediction_finished.emit(input_source_identifier) return self._is_running = True self.status_message.emit(f"Analyzing '{source_path.name}'...", 0) config: Configuration | None = None # Removed: asset_type_definitions: Dict[str, Dict] = {} # Removed: file_type_definitions: Dict[str, Dict] = {} # These are ItemType names try: config = Configuration(preset_name) # Removed: Load allowed types from the project's config module (now dictionaries) # Removed: if app_config: # Removed: asset_type_definitions = getattr(app_config, 'ASSET_TYPE_DEFINITIONS', {}) # Removed: file_type_definitions = getattr(app_config, 'FILE_TYPE_DEFINITIONS', {}) # Removed: log.debug(f"Loaded AssetType Definitions: {list(asset_type_definitions.keys())}") # Removed: log.debug(f"Loaded FileType Definitions (ItemTypes): {list(file_type_definitions.keys())}") # Removed: else: # Removed: log.warning("Project config module not loaded. Cannot get type definitions.") except ConfigurationError as e: log.error(f"Failed to load configuration for preset '{preset_name}': {e}") self.status_message.emit(f"Error loading preset '{preset_name}': {e}", 5000) self.prediction_finished.emit(input_source_identifier) self._is_running = False return except Exception as e: log.exception(f"Unexpected error loading configuration or allowed types for preset '{preset_name}': {e}") self.status_message.emit(f"Unexpected error loading preset '{preset_name}'.", 5000) self.prediction_finished.emit(input_source_identifier) self._is_running = False return log.debug(f"DEBUG: Calling classify_files with file_list: {original_input_paths}") # DEBUG LOG # --- Perform Classification --- try: classified_assets = classify_files(original_input_paths, config) except Exception as e: log.exception(f"Error during file classification for source '{input_source_identifier}': {e}") self.status_message.emit(f"Error classifying files: {e}", 5000) self.prediction_finished.emit(input_source_identifier) self._is_running = False return if not classified_assets: log.warning(f"Classification yielded no assets for source '{input_source_identifier}'.") self.status_message.emit("No assets identified from files.", 3000) self.rule_hierarchy_ready.emit([]) # Emit empty list self.prediction_finished.emit(input_source_identifier) self._is_running = False return # --- Build the Hierarchy --- source_rules_list = [] try: # Determine SourceRule level overrides/defaults # Get supplier name from the config property supplier_identifier = config.supplier_name # Use the property # Create the single SourceRule for this input source source_rule = SourceRule( input_path=input_source_identifier, # Use the identifier provided supplier_identifier=supplier_identifier, # Set overridable field preset_name=preset_name # Pass the selected preset name ) log.debug(f"Created SourceRule for identifier: {input_source_identifier} with supplier: {supplier_identifier}") asset_rules = [] # Get allowed asset types from config's internal core settings asset_type_definitions = config._core_settings.get('ASSET_TYPE_DEFINITIONS', {}) log.debug(f"Loaded AssetType Definitions from config: {list(asset_type_definitions.keys())}") for asset_name, files_info in classified_assets.items(): if not files_info: continue # Skip empty asset groups # Determine AssetRule level overrides/defaults # TODO: Implement logic to determine asset_type based on file types present? # For now, default to MATERIAL if common material maps are present, else GENERIC. # This requires checking item_types in files_info. item_types_in_asset = {f_info['item_type'] for f_info in files_info} predicted_asset_type = "Surface" # Default to "Surface" string # Simple heuristic: if common material types exist, assume Surface # Use strings directly from config.py's ALLOWED_FILE_TYPES material_indicators = {"MAP_COL", "MAP_NRM", "MAP_ROUGH", "MAP_METAL", "MAP_AO", "MAP_DISP"} if any(it in material_indicators for it in item_types_in_asset): predicted_asset_type = "Surface" # Predict as "Surface" string # Ensure the predicted type is allowed, fallback if necessary # Now predicted_asset_type is already a string if asset_type_definitions and predicted_asset_type not in asset_type_definitions: log.warning(f"Predicted AssetType '{predicted_asset_type}' for asset '{asset_name}' is not in ASSET_TYPE_DEFINITIONS from config. Falling back.") # Fallback logic: use the default from config if allowed, else first allowed type # Access DEFAULT_ASSET_CATEGORY using the property default_type = config.default_asset_category # Use the property if default_type in asset_type_definitions: predicted_asset_type = default_type elif asset_type_definitions: predicted_asset_type = list(asset_type_definitions.keys())[0] # Use first key else: pass # Keep the original prediction if definitions are empty asset_rule = AssetRule( asset_name=asset_name, # This is determined by classification asset_type=predicted_asset_type, # Set overridable field (use the string) # asset_type_override=None # This is for user edits, leave as None initially ) log.debug(f"Created AssetRule for asset: {asset_name} with type: {predicted_asset_type}") file_rules = [] # Get allowed file types from config's internal core settings file_type_definitions = config._core_settings.get('FILE_TYPE_DEFINITIONS', {}) log.debug(f"Loaded FileType Definitions (ItemTypes) from config: {list(file_type_definitions.keys())}") for file_info in files_info: # Determine FileRule level overrides/defaults base_item_type = file_info['item_type'] # Type from classification (e.g., COL, NRM, EXTRA) target_asset_name_override = file_info['asset_name'] # From classification # Retrieve the standard_type from the config if available standard_map_type = None file_type_details = file_type_definitions.get(base_item_type) if file_type_details: standard_map_type = file_type_details.get('standard_type') # Try to get explicit standard_type # If standard_type wasn't found in the definition, use the base_item_type itself # (which is the alias in presets like Poliigon.json) if standard_map_type is None and base_item_type in file_type_definitions: # Check base_item_type is a valid key log.debug(f" No explicit 'standard_type' found for item type '{base_item_type}'. Using base_item_type itself as standard_map_type.") standard_map_type = base_item_type # Fallback to using the base type (alias) elif standard_map_type is None: log.debug(f" No 'standard_type' found and base_item_type '{base_item_type}' not in definitions. Setting standard_map_type to None.") # Determine the final item_type string (prefix maps, check if allowed) final_item_type = base_item_type # Start with the base type if not base_item_type.startswith("MAP_") and base_item_type not in ["FILE_IGNORE", "EXTRA", "MODEL"]: # Prefix map types that don't already have it final_item_type = f"MAP_{base_item_type}" # Check if the final type is allowed (exists as a key in config settings) if file_type_definitions and final_item_type not in file_type_definitions and base_item_type not in ["FILE_IGNORE", "EXTRA"]: log.warning(f"Predicted ItemType '{base_item_type}' (checked as '{final_item_type}') for file '{file_info['file_path']}' is not in FILE_TYPE_DEFINITIONS from config. Setting base type to FILE_IGNORE.") final_item_type = "FILE_IGNORE" # Fallback base type to FILE_IGNORE string # Output format is determined by the engine, not predicted here. Leave as None. output_format_override = None # User override for item type starts as None item_type_override = None # --- DEBUG LOG: Inspect data before FileRule creation --- log.debug(f" Creating FileRule for: {file_info['file_path']}") log.debug(f" Base Item Type (from classification): {base_item_type}") log.debug(f" Final Item Type (for model): {final_item_type}") log.debug(f" Target Asset Name Override: {target_asset_name_override}") # --- DETAILED DEBUG LOG: Inspect standard_map_type assignment --- log.debug(f" DEBUG: Processing file: {file_info['file_path']}") log.debug(f" DEBUG: base_item_type = {base_item_type}") log.debug(f" DEBUG: file_type_definitions keys = {list(file_type_definitions.keys())}") # --- Fix: Use final_item_type (prefixed) for lookup, fallback to base_item_type (alias) --- standard_map_type = None # Use final_item_type (e.g., "MAP_AO") for the lookup file_type_details = file_type_definitions.get(final_item_type) log.debug(f" DEBUG: file_type_definitions.get({final_item_type}) = {file_type_details}") # Log lookup result if file_type_details: # Try to get explicit standard_type (might still be missing in some presets) standard_map_type = file_type_details.get('standard_type') log.debug(f" DEBUG: Explicit standard_type from details = {standard_map_type}") # If standard_type wasn't found in the definition, use the base_item_type (alias) # This handles presets like Poliigon.json where the alias is the target_type if standard_map_type is None and final_item_type in file_type_definitions: # Check if the prefixed type was valid log.debug(f" No explicit 'standard_type' found for item type '{final_item_type}'. Using base_item_type ('{base_item_type}') as standard_map_type.") standard_map_type = base_item_type # Fallback to using the base type (alias) elif standard_map_type is None: log.debug(f" Could not determine standard_map_type for base '{base_item_type}' / final '{final_item_type}'. Setting to None.") # --- End Fix --- log.debug(f" DEBUG: Final standard_map_type variable value = {standard_map_type}") # Log final value # --- END DETAILED DEBUG LOG --- # Explicitly check and log the flag value from file_info is_gloss_source_value = file_info.get('is_gloss_source', 'MISSING') # Get value or 'MISSING' log.debug(f" Value for 'is_gloss_source' from file_info: {is_gloss_source_value}") # --- End DEBUG LOG --- # Pass the retrieved flag value and standard_map_type to the constructor file_rule = FileRule( file_path=file_info['file_path'], # This is static info based on input item_type=final_item_type, # Set the new base item_type field # --- Populate ONLY Overridable Fields --- # Initialize override with the classified type for display item_type_override=final_item_type, target_asset_name_override=target_asset_name_override, output_format_override=output_format_override, is_gloss_source=is_gloss_source_value if isinstance(is_gloss_source_value, bool) else False, # Pass the flag, ensure boolean standard_map_type=standard_map_type, # Assign the determined standard_map_type # --- Leave Static Fields as Default/None --- resolution_override=None, channel_merge_instructions={}, # etc. ) file_rules.append(file_rule) asset_rule.files = file_rules asset_rules.append(asset_rule) # Populate the SourceRule with its assets source_rule.assets = asset_rules log.debug(f"Built SourceRule '{source_rule.input_path}' with {len(asset_rules)} AssetRule(s).") source_rules_list.append(source_rule) # Add the single completed SourceRule except Exception as e: log.exception(f"Error building rule hierarchy for source '{input_source_identifier}': {e}") self.status_message.emit(f"Error building rules: {e}", 5000) # Don't emit hierarchy, just finish self.prediction_finished.emit(input_source_identifier) self._is_running = False # Removed erroneous temp_dir_obj cleanup return # --- Emit Results --- # DEBUG Verify: Log the hierarchy being emitted log.info(f"VERIFY: Emitting rule_hierarchy_ready with {len(source_rules_list)} SourceRule(s).") for i, rule in enumerate(source_rules_list): log.debug(f" VERIFY Rule {i}: Input='{rule.input_path}', Assets={len(rule.assets)}") log.info(f"[{time.time():.4f}][T:{thread_id}] Prediction run finished. Emitting hierarchy for '{input_source_identifier}'.") self.rule_hierarchy_ready.emit(source_rules_list) # Emit list containing the one SourceRule log.info(f"[{time.time():.4f}][T:{thread_id}] Emitted rule_hierarchy_ready signal.") # Removed prediction_results_ready signal emission self.status_message.emit(f"Analysis complete for '{input_source_identifier}'.", 3000) self.prediction_finished.emit(input_source_identifier) self._is_running = False # Removed temp_dir_obj cleanup - not relevant here log.info(f"[{time.time():.4f}][T:{thread_id}] <-- Exiting PredictionHandler.run_prediction.")