Codebase dedublication and Cleanup refactor Documentation updated as well Preferences update Removed testfiles from repository
197 lines
8.7 KiB
Python
197 lines
8.7 KiB
Python
# utils/prediction_utils.py
|
|
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
|
|
# Assuming these imports based on project structure and task description
|
|
from rule_structure import SourceRule, RuleSet, MapRule, AssetRule
|
|
from configuration import load_preset # Assuming preset loading is handled here or similar
|
|
# If RuleBasedPredictionHandler exists and is the intended mechanism:
|
|
# from gui.rule_based_prediction_handler import RuleBasedPredictionHandler
|
|
# Or, if we need to replicate its core logic:
|
|
from utils.structure_analyzer import analyze_archive_structure # Hypothetical utility
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Regex to extract preset name (similar to monitor.py)
|
|
# Matches "[PresetName]_anything.zip/rar/7z"
|
|
PRESET_FILENAME_REGEX = re.compile(r"^\[?([a-zA-Z0-9_-]+)\]?_.*\.(zip|rar|7z)$", re.IGNORECASE)
|
|
|
|
class PredictionError(Exception):
|
|
"""Custom exception for prediction failures."""
|
|
pass
|
|
|
|
def generate_source_rule_from_archive(archive_path: Path, config: Dict[str, Any]) -> SourceRule:
|
|
"""
|
|
Generates a SourceRule hierarchy based on rules defined in a preset,
|
|
determined by the archive filename.
|
|
|
|
Args:
|
|
archive_path: Path to the input archive file.
|
|
config: The loaded application configuration dictionary, expected
|
|
to contain preset information or a way to load it.
|
|
|
|
Returns:
|
|
The generated SourceRule hierarchy.
|
|
|
|
Raises:
|
|
PredictionError: If the preset cannot be determined, loaded, or
|
|
if rule generation fails.
|
|
FileNotFoundError: If the archive_path does not exist.
|
|
"""
|
|
if not archive_path.is_file():
|
|
raise FileNotFoundError(f"Archive file not found: {archive_path}")
|
|
|
|
log.debug(f"Generating SourceRule for archive: {archive_path.name}")
|
|
|
|
# --- 1. Extract Preset Name ---
|
|
match = PRESET_FILENAME_REGEX.match(archive_path.name)
|
|
if not match:
|
|
raise PredictionError(f"Filename '{archive_path.name}' does not match expected format '[preset]_filename.ext'. Cannot determine preset.")
|
|
|
|
preset_name = match.group(1)
|
|
log.info(f"Extracted preset name: '{preset_name}' from {archive_path.name}")
|
|
|
|
# --- 2. Load Preset Rules ---
|
|
# Option A: Presets are pre-loaded in config (e.g., under 'presets' key)
|
|
# preset_rules_dict = config.get('presets', {}).get(preset_name)
|
|
# Option B: Load preset dynamically using a utility
|
|
try:
|
|
# Assuming load_preset takes the name and maybe the base config/path
|
|
# Adjust based on the actual signature of load_preset
|
|
preset_config = load_preset(preset_name) # This might need config path or dict
|
|
if not preset_config:
|
|
raise PredictionError(f"Preset '{preset_name}' configuration is empty or invalid.")
|
|
# Assuming the preset config directly contains the RuleSet structure
|
|
# or needs parsing into RuleSet. Let's assume it needs parsing.
|
|
# This part is highly dependent on how presets are stored and loaded.
|
|
# For now, let's assume preset_config IS the RuleSet dictionary.
|
|
if not isinstance(preset_config.get('rules'), dict): # Basic validation
|
|
raise PredictionError(f"Preset '{preset_name}' does not contain a valid 'rules' dictionary.")
|
|
rule_set_dict = preset_config['rules']
|
|
# We need to deserialize this dict into RuleSet object
|
|
# Assuming RuleSet has a class method or similar for this
|
|
rule_set = RuleSet.from_dict(rule_set_dict) # Placeholder for actual deserialization
|
|
|
|
except FileNotFoundError:
|
|
raise PredictionError(f"Preset file for '{preset_name}' not found.")
|
|
except Exception as e:
|
|
log.exception(f"Failed to load or parse preset '{preset_name}': {e}")
|
|
raise PredictionError(f"Failed to load or parse preset '{preset_name}': {e}")
|
|
|
|
if not rule_set:
|
|
raise PredictionError(f"Failed to obtain RuleSet for preset '{preset_name}'.")
|
|
|
|
log.debug(f"Successfully loaded RuleSet for preset: {preset_name}")
|
|
|
|
# --- 3. Generate SourceRule (Simplified Rule-Based Approach) ---
|
|
# This simulates what a RuleBasedPredictionHandler might do, but without
|
|
# needing the actual extracted files for *this* step. The rules themselves
|
|
# define the expected structure. The ProcessingEngine will later use this
|
|
# rule against the actual extracted files.
|
|
|
|
# Create the root SourceRule based on the archive name and the loaded RuleSet
|
|
# The actual structure (AssetRules, MapRules) comes directly from the RuleSet.
|
|
# We might need to adapt the archive name slightly (e.g., remove preset prefix)
|
|
# for the root node name, depending on desired output structure.
|
|
root_name = archive_path.stem # Or further processing if needed
|
|
source_rule = SourceRule(name=root_name, rule_set=rule_set)
|
|
|
|
# Potentially add logic here if basic archive structure analysis *is* needed
|
|
# for rule generation (e.g., using utils.structure_analyzer if it exists)
|
|
# analyze_archive_structure(archive_path, source_rule) # Example
|
|
|
|
log.info(f"Generated initial SourceRule for '{archive_path.name}' based on preset '{preset_name}'.")
|
|
|
|
# --- 4. Return SourceRule ---
|
|
# No temporary workspace needed/created in this function based on current plan.
|
|
# Cleanup is not required here.
|
|
return source_rule
|
|
|
|
# Example Usage (Conceptual - requires actual config/presets)
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
log.info("Testing prediction_utils...")
|
|
|
|
# Create dummy files/config for testing
|
|
dummy_archive = Path("./[TestPreset]_MyAsset.zip")
|
|
dummy_archive.touch()
|
|
|
|
# Need a dummy preset file `Presets/TestPreset.json`
|
|
preset_dir = Path(__file__).parent.parent / "Presets"
|
|
preset_dir.mkdir(exist_ok=True)
|
|
dummy_preset_path = preset_dir / "TestPreset.json"
|
|
dummy_preset_content = """
|
|
{
|
|
"name": "TestPreset",
|
|
"description": "A dummy preset for testing",
|
|
"rules": {
|
|
"map_rules": [
|
|
{"pattern": ".*albedo.*", "map_type": "Albedo", "color_space": "sRGB"},
|
|
{"pattern": ".*normal.*", "map_type": "Normal", "color_space": "Non-Color"}
|
|
],
|
|
"asset_rules": [
|
|
{"pattern": ".*", "material_name": "{asset_name}"}
|
|
]
|
|
},
|
|
"settings": {}
|
|
}
|
|
"""
|
|
# Need RuleSet.from_dict implementation for this to work
|
|
# try:
|
|
# with open(dummy_preset_path, 'w') as f:
|
|
# f.write(dummy_preset_content)
|
|
# log.info(f"Created dummy preset: {dummy_preset_path}")
|
|
|
|
# # Dummy config - structure depends on actual implementation
|
|
# dummy_config = {
|
|
# 'paths': {'presets': str(preset_dir)},
|
|
# # 'presets': { 'TestPreset': json.loads(dummy_preset_content) } # Alt if pre-loaded
|
|
# }
|
|
|
|
# # Mock load_preset if it's complex
|
|
# original_load_preset = load_preset
|
|
# def mock_load_preset(name):
|
|
# if name == "TestPreset":
|
|
# import json
|
|
# return json.loads(dummy_preset_content)
|
|
# else:
|
|
# raise FileNotFoundError
|
|
# load_preset = mock_load_preset # Monkey patch
|
|
|
|
# # Mock RuleSet.from_dict
|
|
# original_from_dict = RuleSet.from_dict
|
|
# def mock_from_dict(data):
|
|
# # Basic mock - replace with actual logic
|
|
# mock_rule_set = RuleSet()
|
|
# mock_rule_set.map_rules = [MapRule(**mr) for mr in data.get('map_rules', [])]
|
|
# mock_rule_set.asset_rules = [AssetRule(**ar) for ar in data.get('asset_rules', [])]
|
|
# return mock_rule_set
|
|
# RuleSet.from_dict = mock_from_dict # Monkey patch
|
|
|
|
|
|
# try:
|
|
# generated_rule = generate_source_rule_from_archive(dummy_archive, dummy_config)
|
|
# log.info(f"Successfully generated SourceRule: {generated_rule.name}")
|
|
# log.info(f" RuleSet Map Rules: {len(generated_rule.rule_set.map_rules)}")
|
|
# log.info(f" RuleSet Asset Rules: {len(generated_rule.rule_set.asset_rules)}")
|
|
# # Add more detailed checks if needed
|
|
# except (PredictionError, FileNotFoundError) as e:
|
|
# log.error(f"Test failed: {e}")
|
|
# except Exception as e:
|
|
# log.exception("Unexpected error during test")
|
|
|
|
# finally:
|
|
# # Clean up dummy files
|
|
# if dummy_archive.exists():
|
|
# dummy_archive.unlink()
|
|
# if dummy_preset_path.exists():
|
|
# dummy_preset_path.unlink()
|
|
# # Restore mocked functions
|
|
# load_preset = original_load_preset
|
|
# RuleSet.from_dict = original_from_dict
|
|
# log.info("Test cleanup complete.")
|
|
|
|
log.warning("Note: Main execution block is commented out as it requires specific implementations of load_preset and RuleSet.from_dict.") |