Pre-Codebase-review commit :3
Codebase dedublication and Cleanup refactor Documentation updated as well Preferences update Removed testfiles from repository
This commit is contained in:
197
utils/prediction_utils.py
Normal file
197
utils/prediction_utils.py
Normal file
@@ -0,0 +1,197 @@
|
||||
# utils/prediction_utils.py
|
||||
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
# Assuming these imports based on project structure and task description
|
||||
from rule_structure import SourceRule, RuleSet, MapRule, AssetRule
|
||||
from configuration import load_preset # Assuming preset loading is handled here or similar
|
||||
# If RuleBasedPredictionHandler exists and is the intended mechanism:
|
||||
# from gui.rule_based_prediction_handler import RuleBasedPredictionHandler
|
||||
# Or, if we need to replicate its core logic:
|
||||
from utils.structure_analyzer import analyze_archive_structure # Hypothetical utility
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Regex to extract preset name (similar to monitor.py)
|
||||
# Matches "[PresetName]_anything.zip/rar/7z"
|
||||
PRESET_FILENAME_REGEX = re.compile(r"^\[?([a-zA-Z0-9_-]+)\]?_.*\.(zip|rar|7z)$", re.IGNORECASE)
|
||||
|
||||
class PredictionError(Exception):
|
||||
"""Custom exception for prediction failures."""
|
||||
pass
|
||||
|
||||
def generate_source_rule_from_archive(archive_path: Path, config: Dict[str, Any]) -> SourceRule:
|
||||
"""
|
||||
Generates a SourceRule hierarchy based on rules defined in a preset,
|
||||
determined by the archive filename.
|
||||
|
||||
Args:
|
||||
archive_path: Path to the input archive file.
|
||||
config: The loaded application configuration dictionary, expected
|
||||
to contain preset information or a way to load it.
|
||||
|
||||
Returns:
|
||||
The generated SourceRule hierarchy.
|
||||
|
||||
Raises:
|
||||
PredictionError: If the preset cannot be determined, loaded, or
|
||||
if rule generation fails.
|
||||
FileNotFoundError: If the archive_path does not exist.
|
||||
"""
|
||||
if not archive_path.is_file():
|
||||
raise FileNotFoundError(f"Archive file not found: {archive_path}")
|
||||
|
||||
log.debug(f"Generating SourceRule for archive: {archive_path.name}")
|
||||
|
||||
# --- 1. Extract Preset Name ---
|
||||
match = PRESET_FILENAME_REGEX.match(archive_path.name)
|
||||
if not match:
|
||||
raise PredictionError(f"Filename '{archive_path.name}' does not match expected format '[preset]_filename.ext'. Cannot determine preset.")
|
||||
|
||||
preset_name = match.group(1)
|
||||
log.info(f"Extracted preset name: '{preset_name}' from {archive_path.name}")
|
||||
|
||||
# --- 2. Load Preset Rules ---
|
||||
# Option A: Presets are pre-loaded in config (e.g., under 'presets' key)
|
||||
# preset_rules_dict = config.get('presets', {}).get(preset_name)
|
||||
# Option B: Load preset dynamically using a utility
|
||||
try:
|
||||
# Assuming load_preset takes the name and maybe the base config/path
|
||||
# Adjust based on the actual signature of load_preset
|
||||
preset_config = load_preset(preset_name) # This might need config path or dict
|
||||
if not preset_config:
|
||||
raise PredictionError(f"Preset '{preset_name}' configuration is empty or invalid.")
|
||||
# Assuming the preset config directly contains the RuleSet structure
|
||||
# or needs parsing into RuleSet. Let's assume it needs parsing.
|
||||
# This part is highly dependent on how presets are stored and loaded.
|
||||
# For now, let's assume preset_config IS the RuleSet dictionary.
|
||||
if not isinstance(preset_config.get('rules'), dict): # Basic validation
|
||||
raise PredictionError(f"Preset '{preset_name}' does not contain a valid 'rules' dictionary.")
|
||||
rule_set_dict = preset_config['rules']
|
||||
# We need to deserialize this dict into RuleSet object
|
||||
# Assuming RuleSet has a class method or similar for this
|
||||
rule_set = RuleSet.from_dict(rule_set_dict) # Placeholder for actual deserialization
|
||||
|
||||
except FileNotFoundError:
|
||||
raise PredictionError(f"Preset file for '{preset_name}' not found.")
|
||||
except Exception as e:
|
||||
log.exception(f"Failed to load or parse preset '{preset_name}': {e}")
|
||||
raise PredictionError(f"Failed to load or parse preset '{preset_name}': {e}")
|
||||
|
||||
if not rule_set:
|
||||
raise PredictionError(f"Failed to obtain RuleSet for preset '{preset_name}'.")
|
||||
|
||||
log.debug(f"Successfully loaded RuleSet for preset: {preset_name}")
|
||||
|
||||
# --- 3. Generate SourceRule (Simplified Rule-Based Approach) ---
|
||||
# This simulates what a RuleBasedPredictionHandler might do, but without
|
||||
# needing the actual extracted files for *this* step. The rules themselves
|
||||
# define the expected structure. The ProcessingEngine will later use this
|
||||
# rule against the actual extracted files.
|
||||
|
||||
# Create the root SourceRule based on the archive name and the loaded RuleSet
|
||||
# The actual structure (AssetRules, MapRules) comes directly from the RuleSet.
|
||||
# We might need to adapt the archive name slightly (e.g., remove preset prefix)
|
||||
# for the root node name, depending on desired output structure.
|
||||
root_name = archive_path.stem # Or further processing if needed
|
||||
source_rule = SourceRule(name=root_name, rule_set=rule_set)
|
||||
|
||||
# Potentially add logic here if basic archive structure analysis *is* needed
|
||||
# for rule generation (e.g., using utils.structure_analyzer if it exists)
|
||||
# analyze_archive_structure(archive_path, source_rule) # Example
|
||||
|
||||
log.info(f"Generated initial SourceRule for '{archive_path.name}' based on preset '{preset_name}'.")
|
||||
|
||||
# --- 4. Return SourceRule ---
|
||||
# No temporary workspace needed/created in this function based on current plan.
|
||||
# Cleanup is not required here.
|
||||
return source_rule
|
||||
|
||||
# Example Usage (Conceptual - requires actual config/presets)
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
log.info("Testing prediction_utils...")
|
||||
|
||||
# Create dummy files/config for testing
|
||||
dummy_archive = Path("./[TestPreset]_MyAsset.zip")
|
||||
dummy_archive.touch()
|
||||
|
||||
# Need a dummy preset file `Presets/TestPreset.json`
|
||||
preset_dir = Path(__file__).parent.parent / "Presets"
|
||||
preset_dir.mkdir(exist_ok=True)
|
||||
dummy_preset_path = preset_dir / "TestPreset.json"
|
||||
dummy_preset_content = """
|
||||
{
|
||||
"name": "TestPreset",
|
||||
"description": "A dummy preset for testing",
|
||||
"rules": {
|
||||
"map_rules": [
|
||||
{"pattern": ".*albedo.*", "map_type": "Albedo", "color_space": "sRGB"},
|
||||
{"pattern": ".*normal.*", "map_type": "Normal", "color_space": "Non-Color"}
|
||||
],
|
||||
"asset_rules": [
|
||||
{"pattern": ".*", "material_name": "{asset_name}"}
|
||||
]
|
||||
},
|
||||
"settings": {}
|
||||
}
|
||||
"""
|
||||
# Need RuleSet.from_dict implementation for this to work
|
||||
# try:
|
||||
# with open(dummy_preset_path, 'w') as f:
|
||||
# f.write(dummy_preset_content)
|
||||
# log.info(f"Created dummy preset: {dummy_preset_path}")
|
||||
|
||||
# # Dummy config - structure depends on actual implementation
|
||||
# dummy_config = {
|
||||
# 'paths': {'presets': str(preset_dir)},
|
||||
# # 'presets': { 'TestPreset': json.loads(dummy_preset_content) } # Alt if pre-loaded
|
||||
# }
|
||||
|
||||
# # Mock load_preset if it's complex
|
||||
# original_load_preset = load_preset
|
||||
# def mock_load_preset(name):
|
||||
# if name == "TestPreset":
|
||||
# import json
|
||||
# return json.loads(dummy_preset_content)
|
||||
# else:
|
||||
# raise FileNotFoundError
|
||||
# load_preset = mock_load_preset # Monkey patch
|
||||
|
||||
# # Mock RuleSet.from_dict
|
||||
# original_from_dict = RuleSet.from_dict
|
||||
# def mock_from_dict(data):
|
||||
# # Basic mock - replace with actual logic
|
||||
# mock_rule_set = RuleSet()
|
||||
# mock_rule_set.map_rules = [MapRule(**mr) for mr in data.get('map_rules', [])]
|
||||
# mock_rule_set.asset_rules = [AssetRule(**ar) for ar in data.get('asset_rules', [])]
|
||||
# return mock_rule_set
|
||||
# RuleSet.from_dict = mock_from_dict # Monkey patch
|
||||
|
||||
|
||||
# try:
|
||||
# generated_rule = generate_source_rule_from_archive(dummy_archive, dummy_config)
|
||||
# log.info(f"Successfully generated SourceRule: {generated_rule.name}")
|
||||
# log.info(f" RuleSet Map Rules: {len(generated_rule.rule_set.map_rules)}")
|
||||
# log.info(f" RuleSet Asset Rules: {len(generated_rule.rule_set.asset_rules)}")
|
||||
# # Add more detailed checks if needed
|
||||
# except (PredictionError, FileNotFoundError) as e:
|
||||
# log.error(f"Test failed: {e}")
|
||||
# except Exception as e:
|
||||
# log.exception("Unexpected error during test")
|
||||
|
||||
# finally:
|
||||
# # Clean up dummy files
|
||||
# if dummy_archive.exists():
|
||||
# dummy_archive.unlink()
|
||||
# if dummy_preset_path.exists():
|
||||
# dummy_preset_path.unlink()
|
||||
# # Restore mocked functions
|
||||
# load_preset = original_load_preset
|
||||
# RuleSet.from_dict = original_from_dict
|
||||
# log.info("Test cleanup complete.")
|
||||
|
||||
log.warning("Note: Main execution block is commented out as it requires specific implementations of load_preset and RuleSet.from_dict.")
|
||||
87
utils/workspace_utils.py
Normal file
87
utils/workspace_utils.py
Normal file
@@ -0,0 +1,87 @@
|
||||
# utils/workspace_utils.py
|
||||
|
||||
import tempfile
|
||||
import shutil
|
||||
import zipfile
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
# Get a logger for this module
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Define supported archive extensions (add more as needed, e.g., '.rar', '.7z')
|
||||
# Requires additional libraries like patoolib for non-zip formats.
|
||||
SUPPORTED_ARCHIVES = {'.zip'}
|
||||
|
||||
def prepare_processing_workspace(input_path_str: Union[str, Path]) -> Path:
|
||||
"""
|
||||
Prepares a temporary workspace for processing an asset source.
|
||||
|
||||
Handles copying directory contents or extracting supported archives
|
||||
into a unique temporary directory.
|
||||
|
||||
Args:
|
||||
input_path_str: The path (as a string or Path object) to the input
|
||||
directory or archive file.
|
||||
|
||||
Returns:
|
||||
The Path object representing the created temporary workspace directory.
|
||||
The caller is responsible for cleaning up this directory.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the input_path does not exist.
|
||||
ValueError: If the input_path is not a directory or a supported archive type.
|
||||
zipfile.BadZipFile: If a zip file is corrupted.
|
||||
OSError: If there are issues creating the temp directory or copying files.
|
||||
"""
|
||||
input_path = Path(input_path_str)
|
||||
log.info(f"Preparing workspace for input: {input_path}")
|
||||
|
||||
if not input_path.exists():
|
||||
raise FileNotFoundError(f"Input path does not exist: {input_path}")
|
||||
|
||||
# Create a secure temporary directory
|
||||
try:
|
||||
temp_workspace_dir = tempfile.mkdtemp(prefix="asset_proc_")
|
||||
prepared_workspace_path = Path(temp_workspace_dir)
|
||||
log.info(f"Created temporary workspace: {prepared_workspace_path}")
|
||||
except OSError as e:
|
||||
log.error(f"Failed to create temporary directory: {e}")
|
||||
raise # Re-raise the exception
|
||||
|
||||
try:
|
||||
# Check if input is directory or a supported archive file
|
||||
if input_path.is_dir():
|
||||
log.info(f"Input is a directory, copying contents to workspace: {input_path}")
|
||||
# Copy directory contents into the temp workspace
|
||||
shutil.copytree(input_path, prepared_workspace_path, dirs_exist_ok=True)
|
||||
elif input_path.is_file() and input_path.suffix.lower() in SUPPORTED_ARCHIVES:
|
||||
log.info(f"Input is a supported archive ({input_path.suffix}), extracting to workspace: {input_path}")
|
||||
if input_path.suffix.lower() == '.zip':
|
||||
with zipfile.ZipFile(input_path, 'r') as zip_ref:
|
||||
zip_ref.extractall(prepared_workspace_path)
|
||||
# Add elif blocks here for other archive types (e.g., using patoolib)
|
||||
# elif input_path.suffix.lower() in ['.rar', '.7z']:
|
||||
# import patoolib
|
||||
# patoolib.extract_archive(str(input_path), outdir=str(prepared_workspace_path))
|
||||
else:
|
||||
# This case should ideally not be reached if SUPPORTED_ARCHIVES is correct
|
||||
raise ValueError(f"Archive type {input_path.suffix} marked as supported but no extraction logic defined.")
|
||||
else:
|
||||
# Handle unsupported input types
|
||||
raise ValueError(f"Unsupported input type: {input_path}. Must be a directory or a supported archive ({', '.join(SUPPORTED_ARCHIVES)}).")
|
||||
|
||||
log.debug(f"Workspace preparation successful for: {input_path}")
|
||||
return prepared_workspace_path
|
||||
|
||||
except (FileNotFoundError, ValueError, zipfile.BadZipFile, OSError, ImportError) as e:
|
||||
# Clean up the created temp directory if preparation fails mid-way
|
||||
log.error(f"Error during workspace preparation for {input_path}: {e}. Cleaning up workspace.")
|
||||
if prepared_workspace_path.exists():
|
||||
try:
|
||||
shutil.rmtree(prepared_workspace_path)
|
||||
log.info(f"Cleaned up failed workspace: {prepared_workspace_path}")
|
||||
except OSError as cleanup_error:
|
||||
log.error(f"Failed to cleanup workspace {prepared_workspace_path} after error: {cleanup_error}")
|
||||
raise # Re-raise the original exception
|
||||
Reference in New Issue
Block a user