# gui/prediction_handler.py import logging from pathlib import Path import time import os import re # Import regex import tempfile # Added for temporary extraction directory import zipfile # Added for zip file handling # import patoolib # Potential import for rar/7z - Add later if zip works from collections import defaultdict from typing import List, Dict, Any # For type hinting # --- PySide6 Imports --- from PySide6.QtCore import QObject, Signal, QThread, Slot # --- Backend Imports --- import sys script_dir = Path(__file__).parent project_root = script_dir.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) try: from configuration import Configuration, ConfigurationError # AssetProcessor might not be needed directly anymore if logic is moved here # from asset_processor import AssetProcessor, AssetProcessingError from rule_structure import SourceRule, AssetRule, FileRule # Removed AssetType, ItemType import config as app_config # Import project's config module # Import the lists directly for easier access from config import ALLOWED_ASSET_TYPES, ALLOWED_FILE_TYPES BACKEND_AVAILABLE = True except ImportError as e: print(f"ERROR (PredictionHandler): Failed to import backend/config modules: {e}") # Define placeholders if imports fail Configuration = None # AssetProcessor = None ConfigurationError = Exception # AssetProcessingError = Exception SourceRule, AssetRule, FileRule, AssetType, ItemType = (None,)*5 # Placeholder for rule structures app_config = None # Placeholder for config BACKEND_AVAILABLE = False log = logging.getLogger(__name__) # Basic config if logger hasn't been set up elsewhere if not log.hasHandlers(): logging.basicConfig(level=logging.INFO, format='%(levelname)s (PredictHandler): %(message)s') # Helper function for classification (can be moved outside class if preferred) def classify_files(file_list: List[str], config: Configuration) -> Dict[str, List[Dict[str, Any]]]: """ Analyzes a list of files based on configuration rules to group them by asset and determine initial file properties. Args: file_list: List of absolute file paths. config: The loaded Configuration object containing naming rules. Returns: A dictionary grouping file information by predicted asset name. Example: { 'AssetName1': [ {'file_path': '/path/to/AssetName1_Color.png', 'item_type': 'Color', 'asset_name': 'AssetName1'}, {'file_path': '/path/to/AssetName1_Normal.png', 'item_type': 'Normal', 'asset_name': 'AssetName1'} ], # ... other assets } Returns an empty dict if classification fails or no files are provided. """ temp_grouped_files = defaultdict(list) extra_files_to_associate = [] # Store tuples: (file_path_str, filename) primary_asset_names = set() # Store asset names derived from map files # --- Validation --- if not file_list or not config: log.warning("Classification skipped: Missing file list or config.") return {} if not hasattr(config, 'compiled_map_keyword_regex') or not config.compiled_map_keyword_regex: log.warning("Classification skipped: Missing compiled map keyword regex.") # Don't return yet, might still find extras if not hasattr(config, 'compiled_extra_regex'): log.warning("Configuration object missing 'compiled_extra_regex'. Cannot classify extra files.") # Continue, but extras won't be found compiled_map_regex = getattr(config, 'compiled_map_keyword_regex', {}) compiled_extra_regex = getattr(config, 'compiled_extra_regex', []) num_map_rules = sum(len(patterns) for patterns in compiled_map_regex.values()) num_extra_rules = len(compiled_extra_regex) log.debug(f"Starting classification for {len(file_list)} files using {num_map_rules} map keyword patterns and {num_extra_rules} extra patterns.") # --- Initial Pass: Classify Maps and Identify Extras --- for file_path_str in file_list: file_path = Path(file_path_str) filename = file_path.name is_extra = False is_map = False # 1. Check for Extra Files FIRST for extra_pattern in compiled_extra_regex: if extra_pattern.search(filename): log.debug(f"File '{filename}' matched EXTRA pattern: {extra_pattern.pattern}") extra_files_to_associate.append((file_path_str, filename)) is_extra = True break # Stop checking extra patterns for this file if is_extra: continue # Move to the next file if it's an extra # 2. Check for Map Files # TODO: Consider rule priority if multiple patterns match the same file for target_type, patterns_list in compiled_map_regex.items(): for compiled_regex, original_keyword, rule_index in patterns_list: match = compiled_regex.search(filename) if match: # --- DEBUG LOG: Inspect available rule info --- log.debug(f" Match found! Rule Index: {rule_index}, Original Keyword: '{original_keyword}', Target Type: '{target_type}'") # Access the full rule details directly from the config's map_type_mapping list using the index matched_rule_details = None try: matched_rule_details = config.map_type_mapping[rule_index] # Access rule by index is_gloss_flag = matched_rule_details.get('is_gloss_source', False) # Get flag or default False log.debug(f" Associated rule details: {matched_rule_details}") log.debug(f" 'is_gloss_source' flag from rule: {is_gloss_flag}") except IndexError: log.warning(f" Could not access map_type_mapping rule at index {rule_index}. Cannot determine 'is_gloss_source' flag.") is_gloss_flag = False # Default if rule cannot be accessed # --- End DEBUG LOG --- matched_item_type = target_type # The standard type (e.g., MAP_COL) asset_name = None # --- Asset Name Extraction Logic (Simplified Heuristic) --- match_start_index = match.start(1) if match_start_index > 0: potential_name = filename[:match_start_index].rstrip('_- .') asset_name = potential_name if potential_name else file_path.stem else: asset_name = file_path.stem if not asset_name: asset_name = file_path.stem log.debug(f"File '{filename}' matched keyword '{original_keyword}' (rule {rule_index}) for item_type '{matched_item_type}'. Assigned asset name: '{asset_name}'") temp_grouped_files[asset_name].append({ 'file_path': file_path_str, 'item_type': matched_item_type, 'asset_name': asset_name, # --- Store the flag retrieved from the rule --- 'is_gloss_source': is_gloss_flag # Store the boolean value obtained above }) primary_asset_names.add(asset_name) # Mark this as a primary asset name is_map = True break # Stop checking patterns for this file if is_map: break # Stop checking target types for this file # 3. Handle Unmatched Files (Not Extra, Not Map) if not is_extra and not is_map: log.debug(f"File '{filename}' did not match any map/extra pattern. Grouping by stem as FILE_IGNORE.") asset_name = file_path.stem temp_grouped_files[asset_name].append({ 'file_path': file_path_str, 'item_type': "FILE_IGNORE", 'asset_name': asset_name }) # --- Determine Primary Asset Name --- # Simple heuristic: if only one name derived from maps, use it. Otherwise, log warning. final_primary_asset_name = None if len(primary_asset_names) == 1: final_primary_asset_name = list(primary_asset_names)[0] log.debug(f"Determined single primary asset name: '{final_primary_asset_name}'") elif len(primary_asset_names) > 1: # TODO: Implement a better heuristic for multiple assets (e.g., longest common prefix) final_primary_asset_name = list(primary_asset_names)[0] # Fallback: use the first one found log.warning(f"Multiple potential primary asset names found: {primary_asset_names}. Using '{final_primary_asset_name}' for associating extra files. Consider refining asset name extraction.") else: # No maps found, but maybe extras exist? Associate with the first asset group found. if temp_grouped_files and extra_files_to_associate: final_primary_asset_name = list(temp_grouped_files.keys())[0] log.warning(f"No map files found to determine primary asset name. Associating extras with first group found: '{final_primary_asset_name}'.") else: log.debug("No primary asset name determined (no maps found).") # --- Associate Extra Files --- if final_primary_asset_name and extra_files_to_associate: log.debug(f"Associating {len(extra_files_to_associate)} extra file(s) with primary asset '{final_primary_asset_name}'") for file_path_str, filename in extra_files_to_associate: temp_grouped_files[final_primary_asset_name].append({ 'file_path': file_path_str, 'item_type': "EXTRA", # Assign specific type 'asset_name': final_primary_asset_name # Associate with primary asset }) elif extra_files_to_associate: log.warning(f"Could not determine a primary asset name to associate {len(extra_files_to_associate)} extra file(s) with. They will be ignored.") # Optionally, create a separate 'Extras' asset group? # for file_path_str, filename in extra_files_to_associate: # temp_grouped_files["_Extras_"].append(...) log.debug(f"Classification complete. Found {len(temp_grouped_files)} potential assets.") return dict(temp_grouped_files) class PredictionHandler(QObject): """ Handles running predictions in a separate thread to avoid GUI freezes. Generates the initial SourceRule hierarchy based on file lists and presets. """ # --- Signals --- # Emitted when the hierarchical rule structure is ready for a single source rule_hierarchy_ready = Signal(list) # Emits a LIST containing ONE SourceRule object # Emitted when prediction/hierarchy generation for a source is done prediction_finished = Signal() # Emitted for status updates status_message = Signal(str, int) def __init__(self, parent=None): super().__init__(parent) self._is_running = False @property def is_running(self): return self._is_running # Removed _predict_single_asset method @Slot(str, list, str) # Explicitly define types for the slot def run_prediction(self, input_source_identifier: str, original_input_paths: list[str], preset_name: str): """ Generates the initial SourceRule hierarchy for a given source identifier (which could be a folder or archive path), extracting the actual file list first. file list, and preset name. Populates only overridable fields based on classification and preset defaults. This method is intended to be run in a separate QThread. """ thread_id = QThread.currentThread() log.info(f"[{time.time():.4f}][T:{thread_id}] --> Entered PredictionHandler.run_prediction.") # Note: file_list argument is renamed to original_input_paths for clarity, # but the signal passes the list of source paths, not the content files yet. # We use input_source_identifier as the primary path to analyze. log.info(f"VERIFY: PredictionHandler received request. Source: '{input_source_identifier}', Original Paths: {original_input_paths}, Preset: '{preset_name}'") # DEBUG Verify log.info(f"Source Identifier: '{input_source_identifier}', Preset: '{preset_name}'") if self._is_running: log.warning("Prediction is already running for another source. Aborting this run.") # Don't emit finished, let the running one complete. return if not BACKEND_AVAILABLE: log.error("Backend/config modules not available. Cannot run prediction.") self.status_message.emit("Error: Backend components missing.", 5000) # self.prediction_finished.emit() # Don't emit finished if never started properly return if not preset_name: log.warning("No preset selected for prediction.") self.status_message.emit("No preset selected.", 3000) # self.prediction_finished.emit() return # Check the identifier path itself source_path = Path(input_source_identifier) if not source_path.exists(): log.warning(f"Input source path does not exist: '{input_source_identifier}'. Skipping prediction.") self.status_message.emit("Input path not found.", 3000) self.rule_hierarchy_ready.emit([]) self.prediction_finished.emit() return self._is_running = True self.status_message.emit(f"Analyzing '{source_path.name}'...", 0) config: Configuration | None = None allowed_asset_types: List[str] = [] allowed_file_types: List[str] = [] # These are ItemType names try: config = Configuration(preset_name) # Load allowed types from the project's config module if app_config: allowed_asset_types = getattr(app_config, 'ALLOWED_ASSET_TYPES', []) allowed_file_types = getattr(app_config, 'ALLOWED_FILE_TYPES', []) log.debug(f"Loaded allowed AssetTypes: {allowed_asset_types}") log.debug(f"Loaded allowed FileTypes (ItemTypes): {allowed_file_types}") else: log.warning("Project config module not loaded. Cannot get allowed types.") except ConfigurationError as e: log.error(f"Failed to load configuration for preset '{preset_name}': {e}") self.status_message.emit(f"Error loading preset '{preset_name}': {e}", 5000) self.prediction_finished.emit() self._is_running = False return except Exception as e: log.exception(f"Unexpected error loading configuration or allowed types for preset '{preset_name}': {e}") self.status_message.emit(f"Unexpected error loading preset '{preset_name}'.", 5000) self.prediction_finished.emit() self._is_running = False return log.debug(f"DEBUG: Calling classify_files with file_list: {original_input_paths}") # DEBUG LOG # --- Perform Classification --- try: classified_assets = classify_files(original_input_paths, config) except Exception as e: log.exception(f"Error during file classification for source '{input_source_identifier}': {e}") self.status_message.emit(f"Error classifying files: {e}", 5000) self.prediction_finished.emit() self._is_running = False return if not classified_assets: log.warning(f"Classification yielded no assets for source '{input_source_identifier}'.") self.status_message.emit("No assets identified from files.", 3000) self.rule_hierarchy_ready.emit([]) # Emit empty list self.prediction_finished.emit() self._is_running = False return # --- Build the Hierarchy --- source_rules_list = [] try: # Determine SourceRule level overrides/defaults # Get supplier name from the config property supplier_identifier = config.supplier_name # Use the property # Create the single SourceRule for this input source source_rule = SourceRule( input_path=input_source_identifier, # Use the identifier provided supplier_identifier=supplier_identifier, # Set overridable field preset_name=preset_name # Pass the selected preset name ) log.debug(f"Created SourceRule for identifier: {input_source_identifier} with supplier: {supplier_identifier}") asset_rules = [] for asset_name, files_info in classified_assets.items(): if not files_info: continue # Skip empty asset groups # Determine AssetRule level overrides/defaults # TODO: Implement logic to determine asset_type based on file types present? # For now, default to MATERIAL if common material maps are present, else GENERIC. # This requires checking item_types in files_info. item_types_in_asset = {f_info['item_type'] for f_info in files_info} predicted_asset_type = "Surface" # Default to "Surface" string # Simple heuristic: if common material types exist, assume Surface # Use strings directly from config.py's ALLOWED_FILE_TYPES material_indicators = {"MAP_COL", "MAP_NRM", "MAP_ROUGH", "MAP_METAL", "MAP_AO", "MAP_DISP"} if any(it in material_indicators for it in item_types_in_asset): predicted_asset_type = "Surface" # Predict as "Surface" string # Ensure the predicted type is allowed, fallback if necessary # Now predicted_asset_type is already a string if allowed_asset_types and predicted_asset_type not in allowed_asset_types: log.warning(f"Predicted AssetType '{predicted_asset_type}' for asset '{asset_name}' is not in ALLOWED_ASSET_TYPES. Falling back.") # Fallback logic: use the default from config if allowed, else first allowed type default_type = getattr(app_config, 'DEFAULT_ASSET_CATEGORY', 'Surface') if default_type in allowed_asset_types: predicted_asset_type = default_type elif allowed_asset_types: predicted_asset_type = allowed_asset_types[0] else: pass # Keep the original prediction if allowed list is empty asset_rule = AssetRule( asset_name=asset_name, # This is determined by classification asset_type=predicted_asset_type, # Set overridable field (use the string) # asset_type_override=None # This is for user edits, leave as None initially ) log.debug(f"Created AssetRule for asset: {asset_name} with type: {predicted_asset_type}") file_rules = [] for file_info in files_info: # Determine FileRule level overrides/defaults item_type_override = file_info['item_type'] # From classification target_asset_name_override = file_info['asset_name'] # From classification # Ensure the predicted item type is allowed (check against prefixed version), skipping EXTRA and FILE_IGNORE # Only prefix if it's a map type that doesn't already have the prefix prefixed_item_type = f"MAP_{item_type_override}" if not item_type_override.startswith("MAP_") and item_type_override not in ["FILE_IGNORE", "EXTRA", "MODEL"] else item_type_override # Check if the (potentially prefixed) type is allowed, but only if it's not supposed to be ignored or extra if allowed_file_types and prefixed_item_type not in allowed_file_types and item_type_override not in ["FILE_IGNORE", "EXTRA"]: log.warning(f"Predicted ItemType '{item_type_override}' (checked as '{prefixed_item_type}') for file '{file_info['file_path']}' is not in ALLOWED_FILE_TYPES. Setting to FILE_IGNORE.") item_type_override = "FILE_IGNORE" # Fallback to FILE_IGNORE string # Output format is determined by the engine, not predicted here. Leave as None. output_format_override = None # --- DEBUG LOG: Inspect data before FileRule creation --- log.debug(f" Creating FileRule for: {file_info['file_path']}") log.debug(f" Using item_type_override: {item_type_override}") log.debug(f" Using target_asset_name_override: {target_asset_name_override}") # Explicitly check and log the flag value from file_info is_gloss_source_value = file_info.get('is_gloss_source', 'MISSING') # Get value or 'MISSING' log.debug(f" Value for 'is_gloss_source' from file_info: {is_gloss_source_value}") # --- End DEBUG LOG --- # TODO: Need to verify FileRule constructor accepts is_gloss_source # and pass is_gloss_source_value if it does. # Pass the retrieved flag value to the constructor file_rule = FileRule( file_path=file_info['file_path'], # This is static info based on input # --- Populate ONLY Overridable Fields --- item_type_override=item_type_override, target_asset_name_override=target_asset_name_override, output_format_override=output_format_override, is_gloss_source=is_gloss_source_value if isinstance(is_gloss_source_value, bool) else False, # Pass the flag, ensure boolean # --- Leave Static Fields as Default/None --- resolution_override=None, channel_merge_instructions={}, # etc. ) file_rules.append(file_rule) asset_rule.files = file_rules asset_rules.append(asset_rule) # Populate the SourceRule with its assets source_rule.assets = asset_rules log.debug(f"Built SourceRule '{source_rule.input_path}' with {len(asset_rules)} AssetRule(s).") source_rules_list.append(source_rule) # Add the single completed SourceRule except Exception as e: log.exception(f"Error building rule hierarchy for source '{input_source_identifier}': {e}") self.status_message.emit(f"Error building rules: {e}", 5000) # Don't emit hierarchy, just finish self.prediction_finished.emit() self._is_running = False # Removed erroneous temp_dir_obj cleanup return # --- Emit Results --- # DEBUG Verify: Log the hierarchy being emitted log.info(f"VERIFY: Emitting rule_hierarchy_ready with {len(source_rules_list)} SourceRule(s).") for i, rule in enumerate(source_rules_list): log.debug(f" VERIFY Rule {i}: Input='{rule.input_path}', Assets={len(rule.assets)}") log.info(f"[{time.time():.4f}][T:{thread_id}] Prediction run finished. Emitting hierarchy for '{input_source_identifier}'.") self.rule_hierarchy_ready.emit(source_rules_list) # Emit list containing the one SourceRule log.info(f"[{time.time():.4f}][T:{thread_id}] Emitted rule_hierarchy_ready signal.") # Removed prediction_results_ready signal emission self.status_message.emit(f"Analysis complete for '{input_source_identifier}'.", 3000) self.prediction_finished.emit() self._is_running = False # Removed temp_dir_obj cleanup - not relevant here log.info(f"[{time.time():.4f}][T:{thread_id}] <-- Exiting PredictionHandler.run_prediction.")