Asset-Frameworker/gui/prediction_handler.py
Rusfort 6971b8189f Data Flow Overhaul
Known regressions in current commit:
- No "extra" files
- GLOSS map does not look corrected
- "override" flag is not respected
2025-05-01 09:13:20 +02:00

417 lines
22 KiB
Python

# gui/prediction_handler.py
import logging
from pathlib import Path
import time
import os
import re # Import regex
import tempfile # Added for temporary extraction directory
import zipfile # Added for zip file handling
# import patoolib # Potential import for rar/7z - Add later if zip works
from collections import defaultdict
from typing import List, Dict, Any # For type hinting
# --- PySide6 Imports ---
from PySide6.QtCore import QObject, Signal, QThread, Slot
# --- Backend Imports ---
import sys
script_dir = Path(__file__).parent
project_root = script_dir.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
try:
from configuration import Configuration, ConfigurationError
# AssetProcessor might not be needed directly anymore if logic is moved here
# from asset_processor import AssetProcessor, AssetProcessingError
from rule_structure import SourceRule, AssetRule, FileRule # Removed AssetType, ItemType
import config as app_config # Import project's config module
# Import the lists directly for easier access
from config import ALLOWED_ASSET_TYPES, ALLOWED_FILE_TYPES
BACKEND_AVAILABLE = True
except ImportError as e:
print(f"ERROR (PredictionHandler): Failed to import backend/config modules: {e}")
# Define placeholders if imports fail
Configuration = None
# AssetProcessor = None
ConfigurationError = Exception
# AssetProcessingError = Exception
SourceRule, AssetRule, FileRule, AssetType, ItemType = (None,)*5 # Placeholder for rule structures
app_config = None # Placeholder for config
BACKEND_AVAILABLE = False
log = logging.getLogger(__name__)
# Basic config if logger hasn't been set up elsewhere
if not log.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(levelname)s (PredictHandler): %(message)s')
# Helper function for classification (can be moved outside class if preferred)
def classify_files(file_list: List[str], config: Configuration) -> Dict[str, List[Dict[str, Any]]]:
"""
Analyzes a list of files based on configuration rules to group them by asset
and determine initial file properties.
Args:
file_list: List of absolute file paths.
config: The loaded Configuration object containing naming rules.
Returns:
A dictionary grouping file information by predicted asset name.
Example:
{
'AssetName1': [
{'file_path': '/path/to/AssetName1_Color.png', 'item_type': 'Color', 'asset_name': 'AssetName1'},
{'file_path': '/path/to/AssetName1_Normal.png', 'item_type': 'Normal', 'asset_name': 'AssetName1'}
],
# ... other assets
}
Returns an empty dict if classification fails or no files are provided.
"""
temp_grouped_files = defaultdict(list)
extra_files_to_associate = [] # Store tuples: (file_path_str, filename)
primary_asset_names = set() # Store asset names derived from map files
# --- Validation ---
if not file_list or not config:
log.warning("Classification skipped: Missing file list or config.")
return {}
if not hasattr(config, 'compiled_map_keyword_regex') or not config.compiled_map_keyword_regex:
log.warning("Classification skipped: Missing compiled map keyword regex.")
# Don't return yet, might still find extras
if not hasattr(config, 'compiled_extra_regex'):
log.warning("Configuration object missing 'compiled_extra_regex'. Cannot classify extra files.")
# Continue, but extras won't be found
compiled_map_regex = getattr(config, 'compiled_map_keyword_regex', {})
compiled_extra_regex = getattr(config, 'compiled_extra_regex', [])
num_map_rules = sum(len(patterns) for patterns in compiled_map_regex.values())
num_extra_rules = len(compiled_extra_regex)
log.debug(f"Starting classification for {len(file_list)} files using {num_map_rules} map keyword patterns and {num_extra_rules} extra patterns.")
# --- Initial Pass: Classify Maps and Identify Extras ---
for file_path_str in file_list:
file_path = Path(file_path_str)
filename = file_path.name
is_extra = False
is_map = False
# 1. Check for Extra Files FIRST
for extra_pattern in compiled_extra_regex:
if extra_pattern.search(filename):
log.debug(f"File '{filename}' matched EXTRA pattern: {extra_pattern.pattern}")
extra_files_to_associate.append((file_path_str, filename))
is_extra = True
break # Stop checking extra patterns for this file
if is_extra:
continue # Move to the next file if it's an extra
# 2. Check for Map Files
# TODO: Consider rule priority if multiple patterns match the same file
for target_type, patterns_list in compiled_map_regex.items():
for compiled_regex, original_keyword, rule_index in patterns_list:
match = compiled_regex.search(filename)
if match:
matched_item_type = target_type # The standard type (e.g., MAP_COL)
asset_name = None
# --- Asset Name Extraction Logic (Simplified Heuristic) ---
match_start_index = match.start(1)
if match_start_index > 0:
potential_name = filename[:match_start_index].rstrip('_- .')
asset_name = potential_name if potential_name else file_path.stem
else:
asset_name = file_path.stem
if not asset_name: asset_name = file_path.stem
log.debug(f"File '{filename}' matched keyword '{original_keyword}' (rule {rule_index}) for item_type '{matched_item_type}'. Assigned asset name: '{asset_name}'")
temp_grouped_files[asset_name].append({
'file_path': file_path_str,
'item_type': matched_item_type,
'asset_name': asset_name
})
primary_asset_names.add(asset_name) # Mark this as a primary asset name
is_map = True
break # Stop checking patterns for this file
if is_map:
break # Stop checking target types for this file
# 3. Handle Unmatched Files (Not Extra, Not Map)
if not is_extra and not is_map:
log.debug(f"File '{filename}' did not match any map/extra pattern. Grouping by stem as FILE_IGNORE.")
asset_name = file_path.stem
temp_grouped_files[asset_name].append({
'file_path': file_path_str,
'item_type': "FILE_IGNORE",
'asset_name': asset_name
})
# --- Determine Primary Asset Name ---
# Simple heuristic: if only one name derived from maps, use it. Otherwise, log warning.
final_primary_asset_name = None
if len(primary_asset_names) == 1:
final_primary_asset_name = list(primary_asset_names)[0]
log.debug(f"Determined single primary asset name: '{final_primary_asset_name}'")
elif len(primary_asset_names) > 1:
# TODO: Implement a better heuristic for multiple assets (e.g., longest common prefix)
final_primary_asset_name = list(primary_asset_names)[0] # Fallback: use the first one found
log.warning(f"Multiple potential primary asset names found: {primary_asset_names}. Using '{final_primary_asset_name}' for associating extra files. Consider refining asset name extraction.")
else:
# No maps found, but maybe extras exist? Associate with the first asset group found.
if temp_grouped_files and extra_files_to_associate:
final_primary_asset_name = list(temp_grouped_files.keys())[0]
log.warning(f"No map files found to determine primary asset name. Associating extras with first group found: '{final_primary_asset_name}'.")
else:
log.debug("No primary asset name determined (no maps found).")
# --- Associate Extra Files ---
if final_primary_asset_name and extra_files_to_associate:
log.debug(f"Associating {len(extra_files_to_associate)} extra file(s) with primary asset '{final_primary_asset_name}'")
for file_path_str, filename in extra_files_to_associate:
temp_grouped_files[final_primary_asset_name].append({
'file_path': file_path_str,
'item_type': "EXTRA", # Assign specific type
'asset_name': final_primary_asset_name # Associate with primary asset
})
elif extra_files_to_associate:
log.warning(f"Could not determine a primary asset name to associate {len(extra_files_to_associate)} extra file(s) with. They will be ignored.")
# Optionally, create a separate 'Extras' asset group?
# for file_path_str, filename in extra_files_to_associate:
# temp_grouped_files["_Extras_"].append(...)
log.debug(f"Classification complete. Found {len(temp_grouped_files)} potential assets.")
return dict(temp_grouped_files)
class PredictionHandler(QObject):
"""
Handles running predictions in a separate thread to avoid GUI freezes.
Generates the initial SourceRule hierarchy based on file lists and presets.
"""
# --- Signals ---
# Emitted when the hierarchical rule structure is ready for a single source
rule_hierarchy_ready = Signal(list) # Emits a LIST containing ONE SourceRule object
# Emitted when prediction/hierarchy generation for a source is done
prediction_finished = Signal()
# Emitted for status updates
status_message = Signal(str, int)
def __init__(self, parent=None):
super().__init__(parent)
self._is_running = False
@property
def is_running(self):
return self._is_running
# Removed _predict_single_asset method
@Slot(str, list, str) # Explicitly define types for the slot
def run_prediction(self, input_source_identifier: str, original_input_paths: list[str], preset_name: str):
"""
Generates the initial SourceRule hierarchy for a given source identifier
(which could be a folder or archive path), extracting the actual file list first.
file list, and preset name. Populates only overridable fields based on
classification and preset defaults.
This method is intended to be run in a separate QThread.
"""
thread_id = QThread.currentThread()
log.info(f"[{time.time():.4f}][T:{thread_id}] --> Entered PredictionHandler.run_prediction.")
# Note: file_list argument is renamed to original_input_paths for clarity,
# but the signal passes the list of source paths, not the content files yet.
# We use input_source_identifier as the primary path to analyze.
log.info(f"VERIFY: PredictionHandler received request. Source: '{input_source_identifier}', Original Paths: {original_input_paths}, Preset: '{preset_name}'") # DEBUG Verify
log.info(f"Source Identifier: '{input_source_identifier}', Preset: '{preset_name}'")
if self._is_running:
log.warning("Prediction is already running for another source. Aborting this run.")
# Don't emit finished, let the running one complete.
return
if not BACKEND_AVAILABLE:
log.error("Backend/config modules not available. Cannot run prediction.")
self.status_message.emit("Error: Backend components missing.", 5000)
# self.prediction_finished.emit() # Don't emit finished if never started properly
return
if not preset_name:
log.warning("No preset selected for prediction.")
self.status_message.emit("No preset selected.", 3000)
# self.prediction_finished.emit()
return
# Check the identifier path itself
source_path = Path(input_source_identifier)
if not source_path.exists():
log.warning(f"Input source path does not exist: '{input_source_identifier}'. Skipping prediction.")
self.status_message.emit("Input path not found.", 3000)
self.rule_hierarchy_ready.emit([])
self.prediction_finished.emit()
return
self._is_running = True
self.status_message.emit(f"Analyzing '{source_path.name}'...", 0)
config: Configuration | None = None
allowed_asset_types: List[str] = []
allowed_file_types: List[str] = [] # These are ItemType names
try:
config = Configuration(preset_name)
# Load allowed types from the project's config module
if app_config:
allowed_asset_types = getattr(app_config, 'ALLOWED_ASSET_TYPES', [])
allowed_file_types = getattr(app_config, 'ALLOWED_FILE_TYPES', [])
log.debug(f"Loaded allowed AssetTypes: {allowed_asset_types}")
log.debug(f"Loaded allowed FileTypes (ItemTypes): {allowed_file_types}")
else:
log.warning("Project config module not loaded. Cannot get allowed types.")
except ConfigurationError as e:
log.error(f"Failed to load configuration for preset '{preset_name}': {e}")
self.status_message.emit(f"Error loading preset '{preset_name}': {e}", 5000)
self.prediction_finished.emit()
self._is_running = False
return
except Exception as e:
log.exception(f"Unexpected error loading configuration or allowed types for preset '{preset_name}': {e}")
self.status_message.emit(f"Unexpected error loading preset '{preset_name}'.", 5000)
self.prediction_finished.emit()
self._is_running = False
return
log.debug(f"DEBUG: Calling classify_files with file_list: {original_input_paths}") # DEBUG LOG
# --- Perform Classification ---
try:
classified_assets = classify_files(original_input_paths, config)
except Exception as e:
log.exception(f"Error during file classification for source '{input_source_identifier}': {e}")
self.status_message.emit(f"Error classifying files: {e}", 5000)
self.prediction_finished.emit()
self._is_running = False
return
if not classified_assets:
log.warning(f"Classification yielded no assets for source '{input_source_identifier}'.")
self.status_message.emit("No assets identified from files.", 3000)
self.rule_hierarchy_ready.emit([]) # Emit empty list
self.prediction_finished.emit()
self._is_running = False
return
# --- Build the Hierarchy ---
source_rules_list = []
try:
# Determine SourceRule level overrides/defaults
# Get supplier name from the config property
supplier_identifier = config.supplier_name # Use the property
# Create the single SourceRule for this input source
source_rule = SourceRule(
input_path=input_source_identifier, # Use the identifier provided
supplier_identifier=supplier_identifier # Set overridable field
)
log.debug(f"Created SourceRule for identifier: {input_source_identifier} with supplier: {supplier_identifier}")
asset_rules = []
for asset_name, files_info in classified_assets.items():
if not files_info: continue # Skip empty asset groups
# Determine AssetRule level overrides/defaults
# TODO: Implement logic to determine asset_type based on file types present?
# For now, default to MATERIAL if common material maps are present, else GENERIC.
# This requires checking item_types in files_info.
item_types_in_asset = {f_info['item_type'] for f_info in files_info}
predicted_asset_type = "Surface" # Default to "Surface" string
# Simple heuristic: if common material types exist, assume Surface
# Use strings directly from config.py's ALLOWED_FILE_TYPES
material_indicators = {"MAP_COL", "MAP_NRM", "MAP_ROUGH", "MAP_METAL", "MAP_AO", "MAP_DISP"}
if any(it in material_indicators for it in item_types_in_asset):
predicted_asset_type = "Surface" # Predict as "Surface" string
# Ensure the predicted type is allowed, fallback if necessary
# Now predicted_asset_type is already a string
if allowed_asset_types and predicted_asset_type not in allowed_asset_types:
log.warning(f"Predicted AssetType '{predicted_asset_type}' for asset '{asset_name}' is not in ALLOWED_ASSET_TYPES. Falling back.")
# Fallback logic: use the default from config if allowed, else first allowed type
default_type = getattr(app_config, 'DEFAULT_ASSET_CATEGORY', 'Surface')
if default_type in allowed_asset_types:
predicted_asset_type = default_type
elif allowed_asset_types:
predicted_asset_type = allowed_asset_types[0]
else:
pass # Keep the original prediction if allowed list is empty
asset_rule = AssetRule(
asset_name=asset_name, # This is determined by classification
asset_type=predicted_asset_type, # Set overridable field (use the string)
# asset_type_override=None # This is for user edits, leave as None initially
)
log.debug(f"Created AssetRule for asset: {asset_name} with type: {predicted_asset_type}")
file_rules = []
for file_info in files_info:
# Determine FileRule level overrides/defaults
item_type_override = file_info['item_type'] # From classification
target_asset_name_override = file_info['asset_name'] # From classification
# Ensure the predicted item type is allowed (check against prefixed version), skipping EXTRA and FILE_IGNORE
# Only prefix if it's a map type that doesn't already have the prefix
prefixed_item_type = f"MAP_{item_type_override}" if not item_type_override.startswith("MAP_") and item_type_override not in ["FILE_IGNORE", "EXTRA", "MODEL"] else item_type_override
# Check if the (potentially prefixed) type is allowed, but only if it's not supposed to be ignored or extra
if allowed_file_types and prefixed_item_type not in allowed_file_types and item_type_override not in ["FILE_IGNORE", "EXTRA"]:
log.warning(f"Predicted ItemType '{item_type_override}' (checked as '{prefixed_item_type}') for file '{file_info['file_path']}' is not in ALLOWED_FILE_TYPES. Setting to FILE_IGNORE.")
item_type_override = "FILE_IGNORE" # Fallback to FILE_IGNORE string
# Output format is determined by the engine, not predicted here. Leave as None.
output_format_override = None
file_rule = FileRule(
file_path=file_info['file_path'], # This is static info based on input
# --- Populate ONLY Overridable Fields ---
item_type_override=item_type_override,
target_asset_name_override=target_asset_name_override,
output_format_override=output_format_override,
# --- Leave Static Fields as Default/None ---
resolution_override=None,
channel_merge_instructions={},
# etc.
)
file_rules.append(file_rule)
asset_rule.files = file_rules
asset_rules.append(asset_rule)
# Populate the SourceRule with its assets
source_rule.assets = asset_rules
log.debug(f"Built SourceRule '{source_rule.input_path}' with {len(asset_rules)} AssetRule(s).")
source_rules_list.append(source_rule) # Add the single completed SourceRule
except Exception as e:
log.exception(f"Error building rule hierarchy for source '{input_source_identifier}': {e}")
self.status_message.emit(f"Error building rules: {e}", 5000)
# Don't emit hierarchy, just finish
self.prediction_finished.emit()
self._is_running = False
# Removed erroneous temp_dir_obj cleanup
return
# --- Emit Results ---
# DEBUG Verify: Log the hierarchy being emitted
log.info(f"VERIFY: Emitting rule_hierarchy_ready with {len(source_rules_list)} SourceRule(s).")
for i, rule in enumerate(source_rules_list):
log.debug(f" VERIFY Rule {i}: Input='{rule.input_path}', Assets={len(rule.assets)}")
log.info(f"[{time.time():.4f}][T:{thread_id}] Prediction run finished. Emitting hierarchy for '{input_source_identifier}'.")
self.rule_hierarchy_ready.emit(source_rules_list) # Emit list containing the one SourceRule
log.info(f"[{time.time():.4f}][T:{thread_id}] Emitted rule_hierarchy_ready signal.")
# Removed prediction_results_ready signal emission
self.status_message.emit(f"Analysis complete for '{input_source_identifier}'.", 3000)
self.prediction_finished.emit()
self._is_running = False
# Removed temp_dir_obj cleanup - not relevant here
log.info(f"[{time.time():.4f}][T:{thread_id}] <-- Exiting PredictionHandler.run_prediction.")