Asset-Frameworker/gui/prediction_handler.py

521 lines
27 KiB
Python

import logging
from pathlib import Path
import time
import os
import re
import tempfile
import zipfile
from collections import defaultdict, Counter
from typing import List, Dict, Any
# --- PySide6 Imports ---
from PySide6.QtCore import QObject, Slot # Keep QObject for parent type hint, Slot for classify_files if kept as method
# Removed Signal, QThread as they are handled by BasePredictionHandler or caller
# --- Backend Imports ---
import sys
script_dir = Path(__file__).parent
project_root = script_dir.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
try:
from configuration import Configuration, ConfigurationError
from rule_structure import SourceRule, AssetRule, FileRule
from .base_prediction_handler import BasePredictionHandler
BACKEND_AVAILABLE = True
except ImportError as e:
print(f"ERROR (RuleBasedPredictionHandler): Failed to import backend/config/base modules: {e}")
Configuration = None
load_base_config = None
ConfigurationError = Exception
SourceRule, AssetRule, FileRule = (None,)*3
BACKEND_AVAILABLE = False
log = logging.getLogger(__name__)
if not log.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(levelname)s (RuleBasedPredictHandler): %(message)s')
def classify_files(file_list: List[str], config: Configuration) -> Dict[str, List[Dict[str, Any]]]:
"""
Analyzes a list of files based on configuration rules using a two-pass approach
to group them by asset and determine initial file properties.
Pass 1: Identifies and classifies prioritized bit depth variants.
Pass 2: Classifies extras, general maps (downgrading if primary exists), and ignores.
Args:
file_list: List of absolute file paths.
config: The loaded Configuration object containing naming rules.
Returns:
A dictionary grouping file information by predicted asset name.
Example:
{
'AssetName1': [
{'file_path': '/path/to/AssetName1_DISP16.png', 'item_type': 'DISP', 'asset_name': 'AssetName1', 'is_gloss_source': False},
{'file_path': '/path/to/AssetName1_DISP.png', 'item_type': 'EXTRA', 'asset_name': 'AssetName1', 'is_gloss_source': False},
{'file_path': '/path/to/AssetName1_Color.png', 'item_type': 'COL', 'asset_name': 'AssetName1', 'is_gloss_source': False}
],
# ... other assets
}
Returns an empty dict if classification fails or no files are provided.
"""
temp_grouped_files = defaultdict(list)
extra_files_to_associate = []
primary_asset_names = set()
primary_assignments = set()
processed_in_pass1 = set()
# --- Validation ---
if not file_list or not config:
log.warning("Classification skipped: Missing file list or config.")
return {}
if not hasattr(config, 'compiled_map_keyword_regex') or not config.compiled_map_keyword_regex:
log.warning("Classification skipped: Missing compiled map keyword regex in config.")
if not hasattr(config, 'compiled_extra_regex'):
log.warning("Configuration object missing 'compiled_extra_regex'. Cannot classify extra files.")
if not hasattr(config, 'compiled_bit_depth_regex_map'):
log.warning("Configuration object missing 'compiled_bit_depth_regex_map'. Cannot prioritize bit depth variants.")
compiled_map_regex = getattr(config, 'compiled_map_keyword_regex', {})
compiled_extra_regex = getattr(config, 'compiled_extra_regex', [])
compiled_bit_depth_regex_map = getattr(config, 'compiled_bit_depth_regex_map', {})
num_map_rules = sum(len(patterns) for patterns in compiled_map_regex.values())
num_extra_rules = len(compiled_extra_regex)
num_bit_depth_rules = len(compiled_bit_depth_regex_map)
log.debug(f"Starting classification for {len(file_list)} files using {num_map_rules} map keyword patterns, {num_bit_depth_rules} bit depth patterns, and {num_extra_rules} extra patterns.")
# --- Asset Name Extraction Helper ---
def get_asset_name(f_path: Path, cfg: Configuration) -> str:
filename = f_path.name
asset_name = None
try:
separator = cfg.source_naming_separator
indices = cfg.source_naming_indices
base_name_index = indices.get('base_name')
if separator is not None and base_name_index is not None:
stem = f_path.stem
parts = stem.split(separator)
if 0 <= base_name_index < len(parts):
asset_name = parts[base_name_index]
else:
log.warning(f"Preset base_name index {base_name_index} out of bounds for '{stem}' split by '{separator}'. Falling back.")
else:
log.debug(f"Preset rules for asset name extraction incomplete (separator: {separator}, index: {base_name_index}). Falling back for '{filename}'.")
if not asset_name:
asset_name = f_path.stem.split('_')[0] if '_' in f_path.stem else f_path.stem
log.debug(f"Used fallback asset name extraction: '{asset_name}' for '{filename}'.")
except Exception as e:
log.exception(f"Error extracting asset name for '{filename}': {e}. Falling back to stem.")
asset_name = f_path.stem
if not asset_name:
asset_name = f_path.stem
log.warning(f"Asset name extraction resulted in empty string for '{filename}'. Using stem: '{asset_name}'.")
return asset_name
# --- Pass 1: Prioritized Bit Depth Variants ---
log.debug("--- Starting Classification Pass 1: Prioritized Variants ---")
for file_path_str in file_list:
file_path = Path(file_path_str)
filename = file_path.name
asset_name = get_asset_name(file_path, config)
processed = False
for target_type, variant_regex in compiled_bit_depth_regex_map.items():
match = variant_regex.search(filename)
if match:
log.debug(f"PASS 1: File '{filename}' matched PRIORITIZED bit depth variant for type '{target_type}'.")
matched_item_type = target_type
is_gloss_flag = False
if (asset_name, matched_item_type) in primary_assignments:
log.warning(f"PASS 1: Primary assignment ({asset_name}, {matched_item_type}) already exists. File '{filename}' will be handled in Pass 2.")
else:
primary_assignments.add((asset_name, matched_item_type))
log.debug(f" PASS 1: Added primary assignment: ({asset_name}, {matched_item_type})")
primary_asset_names.add(asset_name)
temp_grouped_files[asset_name].append({
'file_path': file_path_str,
'item_type': matched_item_type,
'asset_name': asset_name,
'is_gloss_source': is_gloss_flag
})
processed_in_pass1.add(file_path_str)
processed = True
break # Stop checking other variant patterns for this file
log.debug(f"--- Finished Pass 1. Primary assignments made: {primary_assignments} ---")
# --- Pass 2: Extras, General Maps, Ignores ---
log.debug("--- Starting Classification Pass 2: Extras, General Maps, Ignores ---")
for file_path_str in file_list:
if file_path_str in processed_in_pass1:
log.debug(f"PASS 2: Skipping '{Path(file_path_str).name}' (processed in Pass 1).")
continue
file_path = Path(file_path_str)
filename = file_path.name
asset_name = get_asset_name(file_path, config)
is_extra = False
is_map = False
# 1. Check for Extra Files FIRST in Pass 2
for extra_pattern in compiled_extra_regex:
if extra_pattern.search(filename):
log.debug(f"PASS 2: File '{filename}' matched EXTRA pattern: {extra_pattern.pattern}")
extra_files_to_associate.append((file_path_str, filename))
is_extra = True
break
if is_extra:
continue
# 2. Check for General Map Files in Pass 2
for target_type, patterns_list in compiled_map_regex.items():
for compiled_regex, original_keyword, rule_index in patterns_list:
match = compiled_regex.search(filename)
if match:
is_gloss_flag = False
try:
map_type_mapping_list = config.map_type_mapping
matched_rule_details = map_type_mapping_list[rule_index]
is_gloss_flag = matched_rule_details.get('is_gloss_source', False)
log.debug(f" PASS 2: Match found! Rule Index: {rule_index}, Keyword: '{original_keyword}', Target: '{target_type}', Gloss: {is_gloss_flag}")
except Exception as e:
log.exception(f" PASS 2: Error accessing rule details for index {rule_index}: {e}")
# *** Crucial Check: Has a prioritized variant claimed this type? ***
if (asset_name, target_type) in primary_assignments:
log.debug(f"PASS 2: File '{filename}' matched '{original_keyword}' for type '{target_type}', but primary already assigned via Pass 1. Classifying as EXTRA.")
matched_item_type = "EXTRA"
is_gloss_flag = False
else:
log.debug(f"PASS 2: File '{filename}' matched '{original_keyword}' for item_type '{target_type}'.")
matched_item_type = target_type
temp_grouped_files[asset_name].append({
'file_path': file_path_str,
'item_type': matched_item_type,
'asset_name': asset_name,
'is_gloss_source': is_gloss_flag
})
is_map = True
break
if is_map:
break
# 3. Handle Unmatched Files in Pass 2 (Not Extra, Not Map)
if not is_extra and not is_map:
log.debug(f"PASS 2: File '{filename}' did not match any map/extra pattern. Grouping under asset '{asset_name}' as FILE_IGNORE.")
temp_grouped_files[asset_name].append({
'file_path': file_path_str,
'item_type': "FILE_IGNORE",
'asset_name': asset_name,
'is_gloss_source': False
})
log.debug("--- Finished Pass 2 ---")
# --- Determine Primary Asset Name for Extra Association (using Pass 1 results) ---
final_primary_asset_name = None
if primary_asset_names:
primary_map_asset_names_pass1 = [
f_info['asset_name']
for asset_files in temp_grouped_files.values()
for f_info in asset_files
if f_info['asset_name'] in primary_asset_names and (f_info['asset_name'], f_info['item_type']) in primary_assignments
]
if primary_map_asset_names_pass1:
name_counts = Counter(primary_map_asset_names_pass1)
most_common_names = name_counts.most_common()
final_primary_asset_name = most_common_names[0][0]
if len(most_common_names) > 1 and most_common_names[0][1] == most_common_names[1][1]:
tied_names = sorted([name for name, count in most_common_names if count == most_common_names[0][1]])
final_primary_asset_name = tied_names[0]
log.warning(f"Multiple primary asset names tied for most common based on Pass 1: {tied_names}. Using '{final_primary_asset_name}' for associating extra files.")
log.debug(f"Determined primary asset name for extras based on Pass 1 primary maps: '{final_primary_asset_name}'")
else:
log.warning("Primary asset names set (from Pass 1) was populated, but no corresponding groups found. Falling back.")
if not final_primary_asset_name:
if temp_grouped_files and extra_files_to_associate:
fallback_name = sorted(temp_grouped_files.keys())[0]
final_primary_asset_name = fallback_name
log.warning(f"No primary map files found in Pass 1. Associating extras with first group found alphabetically: '{final_primary_asset_name}'.")
elif extra_files_to_associate:
log.warning(f"Could not determine any asset name to associate {len(extra_files_to_associate)} extra file(s) with. They will be ignored.")
else:
log.debug("No primary asset name determined (no maps or extras found).")
# --- Associate Extra Files (collected in Pass 2) ---
if final_primary_asset_name and extra_files_to_associate:
log.debug(f"Associating {len(extra_files_to_associate)} extra file(s) with primary asset '{final_primary_asset_name}'")
for file_path_str, filename in extra_files_to_associate:
if not any(f['file_path'] == file_path_str for f in temp_grouped_files[final_primary_asset_name]):
temp_grouped_files[final_primary_asset_name].append({
'file_path': file_path_str,
'item_type': "EXTRA",
'asset_name': final_primary_asset_name,
'is_gloss_source': False
})
else:
log.debug(f"Skipping duplicate association of extra file: {filename}")
elif extra_files_to_associate:
pass
log.debug(f"Classification complete. Found {len(temp_grouped_files)} potential assets.")
return dict(temp_grouped_files)
class RuleBasedPredictionHandler(BasePredictionHandler):
"""
Handles running rule-based predictions in a separate thread using presets.
Generates the initial SourceRule hierarchy based on file lists and presets.
Inherits from BasePredictionHandler for common threading and signaling.
"""
def __init__(self, input_source_identifier: str, original_input_paths: list[str], preset_name: str, parent: QObject = None):
"""
Initializes the rule-based handler.
Args:
input_source_identifier: The unique identifier for the input source (e.g., file path).
original_input_paths: List of absolute file paths extracted from the source.
preset_name: The name of the preset configuration to use.
parent: The parent QObject.
"""
super().__init__(input_source_identifier, parent)
self.original_input_paths = original_input_paths
self.preset_name = preset_name
self._current_input_path = None
self._current_file_list = None
self._current_preset_name = None
# Re-introduce run_prediction as the main slot to receive requests
@Slot(str, list, str)
def run_prediction(self, input_source_identifier: str, original_input_paths: list[str], preset_name: str):
"""
Generates the initial SourceRule hierarchy for a given source identifier,
file list, and preset name. Populates only overridable fields based on
classification and preset defaults.
This method is intended to be run in the handler's QThread.
Uses the base class signals for reporting results/errors.
"""
# Check if already running a prediction for a *different* source
# Allow re-triggering for the *same* source if needed (e.g., preset changed)
if self._is_running and self._current_input_path != input_source_identifier:
log.warning(f"RuleBasedPredictionHandler is busy with '{self._current_input_path}'. Ignoring request for '{input_source_identifier}'.")
return
self._is_running = True
self._is_cancelled = False
self._current_input_path = input_source_identifier
self._current_file_list = original_input_paths
self._current_preset_name = preset_name
log.info(f"Starting rule-based prediction for: {input_source_identifier} using preset: {preset_name}")
self.status_update.emit(f"Starting analysis for '{Path(input_source_identifier).name}'...")
source_rules_list = []
try:
if not BACKEND_AVAILABLE:
raise RuntimeError("Backend/config modules not available. Cannot run prediction.")
if not preset_name:
log.warning("No preset selected for prediction.")
self.status_update.emit("No preset selected.")
self.prediction_ready.emit(input_source_identifier, [])
self._is_running = False
return
source_path = Path(input_source_identifier)
if not source_path.exists():
log.warning(f"Input source path does not exist: '{input_source_identifier}'. Skipping prediction.")
raise FileNotFoundError(f"Input source path not found: {input_source_identifier}")
# --- Load Configuration ---
config = Configuration(preset_name)
log.info(f"Successfully loaded configuration for preset '{preset_name}'.")
if self._is_cancelled: raise RuntimeError("Prediction cancelled before classification.")
# --- Perform Classification ---
self.status_update.emit(f"Classifying files for '{source_path.name}'...")
try:
classified_assets = classify_files(original_input_paths, config)
except Exception as e:
log.exception(f"Error during file classification for source '{input_source_identifier}': {e}")
raise RuntimeError(f"Error classifying files: {e}") from e
if self._is_cancelled: raise RuntimeError("Prediction cancelled after classification.")
if not classified_assets:
log.warning(f"Classification yielded no assets for source '{input_source_identifier}'.")
self.status_update.emit("No assets identified from files.")
self.prediction_ready.emit(input_source_identifier, [])
self._is_running = False
return
# --- Build the Hierarchy ---
self.status_update.emit(f"Building rule hierarchy for '{source_path.name}'...")
try:
supplier_identifier = config.supplier_name
source_rule = SourceRule(
input_path=input_source_identifier,
supplier_identifier=supplier_identifier,
preset_name=preset_name
)
asset_rules = []
file_type_definitions = config._core_settings.get('FILE_TYPE_DEFINITIONS', {})
for asset_name, files_info in classified_assets.items():
if self._is_cancelled: raise RuntimeError("Prediction cancelled during hierarchy building (assets).")
if not files_info: continue
asset_category_rules = config.asset_category_rules
asset_type_definitions = config.get_asset_type_definitions()
asset_type_keys = list(asset_type_definitions.keys())
# Initialize predicted_asset_type using the validated default
predicted_asset_type = config.default_asset_category
log.debug(f"Asset '{asset_name}': Initial predicted_asset_type set to default: '{predicted_asset_type}'.")
# 1. Check asset_category_rules from preset
determined_by_rule = False
# Check for Model type based on file patterns
if "Model" in asset_type_keys:
model_patterns_regex = config.compiled_model_regex
for f_info in files_info:
if f_info['item_type'] in ["EXTRA", "FILE_IGNORE"]:
continue
file_path_obj = Path(f_info['file_path'])
for pattern_re in model_patterns_regex:
if pattern_re.search(file_path_obj.name):
predicted_asset_type = "Model"
determined_by_rule = True
log.debug(f"Asset '{asset_name}' classified as 'Model' due to file '{file_path_obj.name}' matching pattern '{pattern_re.pattern}'.")
break
if determined_by_rule:
break
# Check for Decal type based on keywords in asset name (if not already Model)
if not determined_by_rule and "Decal" in asset_type_keys:
decal_keywords = asset_category_rules.get('decal_keywords', [])
for keyword in decal_keywords:
# Ensure keyword is a string before trying to escape it
if isinstance(keyword, str) and keyword:
try:
if re.search(r'\b' + re.escape(keyword) + r'\b', asset_name, re.IGNORECASE):
predicted_asset_type = "Decal"
determined_by_rule = True
log.debug(f"Asset '{asset_name}' classified as 'Decal' due to keyword '{keyword}'.")
break
except re.error as e_re:
log.warning(f"Regex error with decal_keyword '{keyword}': {e_re}")
if determined_by_rule:
pass
# 2. If not determined by specific rules, check for Surface (if not Model/Decal by rule)
if not determined_by_rule and predicted_asset_type == config.default_asset_category and "Surface" in asset_type_keys:
item_types_in_asset = {f_info['item_type'] for f_info in files_info}
# Ensure we are checking against standard map types from FILE_TYPE_DEFINITIONS
# This check is primarily for PBR texture sets.
material_indicators = {
ft_key for ft_key, ft_def in config.get_file_type_definitions_with_examples().items()
if ft_def.get('standard_type') and ft_def.get('standard_type') not in ["", "EXTRA", "FILE_IGNORE", "MODEL"]
}
# Add common direct standard types as well for robustness
material_indicators.update({"COL", "NRM", "ROUGH", "METAL", "AO", "DISP"})
has_material_map = False
for item_type in item_types_in_asset:
# Check if the item_type itself is a material indicator or its standard_type is
if item_type in material_indicators:
has_material_map = True
break
# Check standard type if item_type is a key in FILE_TYPE_DEFINITIONS
item_def = config.get_file_type_definitions_with_examples().get(item_type)
if item_def and item_def.get('standard_type') in material_indicators:
has_material_map = True
break
if has_material_map:
predicted_asset_type = "Surface"
log.debug(f"Asset '{asset_name}' classified as 'Surface' due to material indicators.")
# 3. Final validation: Ensure predicted_asset_type is a valid key.
if predicted_asset_type not in asset_type_keys:
log.warning(f"Derived AssetType '{predicted_asset_type}' for asset '{asset_name}' is not in ASSET_TYPE_DEFINITIONS. "
f"Falling back to default: '{config.default_asset_category}'.")
predicted_asset_type = config.default_asset_category
asset_rule = AssetRule(asset_name=asset_name, asset_type=predicted_asset_type)
file_rules = []
for file_info in files_info:
if self._is_cancelled: raise RuntimeError("Prediction cancelled during hierarchy building (files).")
base_item_type = file_info['item_type']
target_asset_name_override = file_info['asset_name']
final_item_type = base_item_type
if not base_item_type.startswith("MAP_") and base_item_type not in ["FILE_IGNORE", "EXTRA", "MODEL"]:
final_item_type = f"MAP_{base_item_type}"
if file_type_definitions and final_item_type not in file_type_definitions and base_item_type not in ["FILE_IGNORE", "EXTRA"]:
log.warning(f"Predicted ItemType '{base_item_type}' (checked as '{final_item_type}') for file '{file_info['file_path']}' is not in FILE_TYPE_DEFINITIONS. Setting to FILE_IGNORE.")
final_item_type = "FILE_IGNORE"
is_gloss_source_value = file_info.get('is_gloss_source', False)
file_rule = FileRule(
file_path=file_info['file_path'],
item_type=final_item_type,
item_type_override=final_item_type,
target_asset_name_override=target_asset_name_override,
output_format_override=None,
is_gloss_source=is_gloss_source_value if isinstance(is_gloss_source_value, bool) else False,
resolution_override=None,
channel_merge_instructions={},
)
file_rules.append(file_rule)
asset_rule.files = file_rules
asset_rules.append(asset_rule)
source_rule.assets = asset_rules
source_rules_list.append(source_rule)
except Exception as e:
log.exception(f"Error building rule hierarchy for source '{input_source_identifier}': {e}")
raise RuntimeError(f"Error building rule hierarchy: {e}") from e
# --- Emit Success Signal ---
log.info(f"Rule-based prediction finished successfully for '{input_source_identifier}'.")
self.prediction_ready.emit(input_source_identifier, source_rules_list)
except Exception as e:
# --- Emit Error Signal ---
log.exception(f"Error during rule-based prediction for '{input_source_identifier}': {e}")
error_msg = f"Error analyzing '{Path(input_source_identifier).name}': {e}"
self.prediction_error.emit(input_source_identifier, error_msg)
finally:
self._is_running = False
self._current_input_path = None
self._current_file_list = None
self._current_preset_name = None
log.info(f"Finished rule-based prediction run for: {input_source_identifier}")
def is_running(self) -> bool:
"""Returns True if the handler is currently processing a prediction request."""
return self._is_running