Asset-Frameworker/gui/prediction_handler.py

552 lines
31 KiB
Python

import logging
from pathlib import Path
import time
import os
import re
import tempfile
import zipfile
from collections import defaultdict, Counter
from typing import List, Dict, Any
# --- PySide6 Imports ---
from PySide6.QtCore import QObject, Slot # Keep QObject for parent type hint, Slot for classify_files if kept as method
# Removed Signal, QThread as they are handled by BasePredictionHandler or caller
# --- Backend Imports ---
import sys
script_dir = Path(__file__).parent
project_root = script_dir.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
try:
from configuration import Configuration, ConfigurationError
from rule_structure import SourceRule, AssetRule, FileRule
from .base_prediction_handler import BasePredictionHandler
BACKEND_AVAILABLE = True
except ImportError as e:
print(f"ERROR (RuleBasedPredictionHandler): Failed to import backend/config/base modules: {e}")
Configuration = None
load_base_config = None
ConfigurationError = Exception
SourceRule, AssetRule, FileRule = (None,)*3
BACKEND_AVAILABLE = False
log = logging.getLogger(__name__)
if not log.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(levelname)s (RuleBasedPredictHandler): %(message)s')
def classify_files(file_list: List[str], config: Configuration) -> Dict[str, List[Dict[str, Any]]]:
"""
Analyzes a list of files based on configuration rules to group them by asset
and determine initial file properties, applying prioritization based on
'priority_keywords' in map_type_mapping.
Args:
file_list: List of absolute file paths.
config: The loaded Configuration object containing naming rules.
Returns:
A dictionary grouping file information by predicted asset name.
Example:
{
'AssetName1': [
{'file_path': '/path/to/AssetName1_DISP16.png', 'item_type': 'MAP_DISP', 'asset_name': 'AssetName1'},
{'file_path': '/path/to/AssetName1_Color.png', 'item_type': 'MAP_COL', 'asset_name': 'AssetName1'}
],
# ... other assets
}
Files marked as "FILE_IGNORE" will also be included in the output.
Returns an empty dict if classification fails or no files are provided.
"""
classified_files_info: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
file_matches: Dict[str, List[Tuple[str, int, bool]]] = defaultdict(list) # {file_path: [(target_type, rule_index, is_priority), ...]}
files_to_ignore: Set[str] = set()
# --- DEBUG: Log the input file_list ---
log.info(f"DEBUG_ROO_CLASSIFY_INPUT: classify_files received file_list (len={len(file_list)}): {file_list}")
# --- END DEBUG ---
# --- Validation ---
if not file_list or not config:
log.warning("Classification skipped: Missing file list or config.")
return {}
if not hasattr(config, 'compiled_map_keyword_regex') or not config.compiled_map_keyword_regex:
log.warning("Classification skipped: Missing compiled map keyword regex in config.")
# Proceeding might still classify EXTRA/FILE_IGNORE if those rules exist
if not hasattr(config, 'compiled_extra_regex'):
log.warning("Configuration object missing 'compiled_extra_regex'. Cannot classify extra files.")
compiled_extra_regex = [] # Provide default to avoid errors
else:
compiled_extra_regex = getattr(config, 'compiled_extra_regex', [])
compiled_map_regex = getattr(config, 'compiled_map_keyword_regex', {})
# Note: compiled_bit_depth_regex_map is no longer used for primary classification logic here
num_map_rules = sum(len(patterns) for patterns in compiled_map_regex.values())
num_extra_rules = len(compiled_extra_regex)
log.debug(f"Starting classification for {len(file_list)} files using {num_map_rules} map keyword patterns and {num_extra_rules} extra patterns.")
# --- Asset Name Extraction Helper ---
def get_asset_name(f_path: Path, cfg: Configuration) -> str:
filename = f_path.name
asset_name = None
try:
separator = cfg.source_naming_separator
indices = cfg.source_naming_indices
base_name_index = indices.get('base_name')
if separator is not None and base_name_index is not None:
stem = f_path.stem
parts = stem.split(separator)
if 0 <= base_name_index < len(parts):
asset_name = parts[base_name_index]
else:
log.warning(f"Preset base_name index {base_name_index} out of bounds for '{stem}' split by '{separator}'. Falling back.")
else:
log.debug(f"Preset rules for asset name extraction incomplete (separator: {separator}, index: {base_name_index}). Falling back for '{filename}'.")
if not asset_name:
asset_name = f_path.stem.split('_')[0] if '_' in f_path.stem else f_path.stem
log.debug(f"Used fallback asset name extraction: '{asset_name}' for '{filename}'.")
except Exception as e:
log.exception(f"Error extracting asset name for '{filename}': {e}. Falling back to stem.")
asset_name = f_path.stem
if not asset_name:
asset_name = f_path.stem
log.warning(f"Asset name extraction resulted in empty string for '{filename}'. Using stem: '{asset_name}'.")
return asset_name
# --- Pass 1: Collect all potential matches for each file ---
# For each file, find all map_type_mapping rules it matches (both regular and priority keywords).
# Store the target_type, original rule_index, and whether it was a priority match.
log.debug("--- Starting Classification Pass 1: Collect Potential Matches ---")
file_matches: Dict[str, List[Tuple[str, int, bool]]] = defaultdict(list) # {file_path: [(target_type, rule_index, is_priority), ...]}
files_classified_as_extra: Set[str] = set() # Files already classified as EXTRA
compiled_map_regex = getattr(config, 'compiled_map_keyword_regex', {})
compiled_extra_regex = getattr(config, 'compiled_extra_regex', [])
for file_path_str in file_list:
file_path = Path(file_path_str)
filename = file_path.name
asset_name = get_asset_name(file_path, config)
if "BoucleChunky001" in file_path_str:
log.info(f"DEBUG_ROO: Processing file: {file_path_str}")
# Check for EXTRA files first
is_extra = False
for extra_pattern in compiled_extra_regex:
if extra_pattern.search(filename):
if "BoucleChunky001_DISP_1K_METALNESS.png" in filename and extra_pattern.search(filename):
log.info(f"DEBUG_ROO: EXTRA MATCH: File '{filename}' matched EXTRA pattern: {extra_pattern.pattern}")
log.debug(f"PASS 1: File '{filename}' matched EXTRA pattern: {extra_pattern.pattern}")
# For EXTRA, we assign it directly and don't check map rules for this file
classified_files_info[asset_name].append({
'file_path': file_path_str,
'item_type': "EXTRA",
'asset_name': asset_name
})
files_classified_as_extra.add(file_path_str)
is_extra = True
break
if "BoucleChunky001_DISP_1K_METALNESS.png" in filename and not is_extra: # after the extra loop
log.info(f"DEBUG_ROO: EXTRA CHECK FAILED for {filename}. is_extra: {is_extra}")
if "BoucleChunky001_DISP_1K_METALNESS.png" in filename and not is_extra:
log.info(f"DEBUG_ROO: EXTRA CHECK FAILED for {filename}. is_extra: {is_extra}")
if is_extra:
continue # Move to the next file
# If not EXTRA, check for MAP matches (collect all potential matches)
for target_type, patterns_list in compiled_map_regex.items():
for compiled_regex, original_keyword, rule_index, is_priority in patterns_list:
match = compiled_regex.search(filename)
if match:
if "BoucleChunky001" in file_path_str:
log.info(f"DEBUG_ROO: PASS 1 MAP MATCH: File '{filename}' matched keyword '{original_keyword}' (priority: {is_priority}) for target type '{target_type}' (Rule Index: {rule_index}).")
log.debug(f" PASS 1: File '{filename}' matched keyword '{original_keyword}' (priority: {is_priority}) for target type '{target_type}' (Rule Index: {rule_index}).")
file_matches[file_path_str].append((target_type, rule_index, is_priority))
log.debug(f"--- Finished Pass 1. Collected matches for {len(file_matches)} files. ---")
# --- Pass 2: Determine Trumped Regular Matches ---
# Identify which regular matches are trumped by a priority match for the same rule_index within the asset.
log.debug("--- Starting Classification Pass 2: Determine Trumped Regular Matches ---")
trumped_regular_matches: Set[Tuple[str, int]] = set() # Set of (file_path_str, rule_index) pairs that are trumped
# First, determine which rule_indices have *any* priority match across the entire asset
rule_index_has_priority_match_in_asset: Set[int] = set()
for file_path_str, matches in file_matches.items():
for match_target, match_rule_index, match_is_priority in matches:
if match_is_priority:
rule_index_has_priority_match_in_asset.add(match_rule_index)
log.debug(f" Rule indices with priority matches in asset: {sorted(list(rule_index_has_priority_match_in_asset))}")
# Then, for each file, check its matches against the rules that had priority matches
for file_path_str in file_list:
if file_path_str in files_classified_as_extra:
continue
matches_for_this_file = file_matches.get(file_path_str, [])
# Determine if this file has any priority match for a given rule_index
file_has_priority_match_for_rule: Dict[int, bool] = defaultdict(bool)
for match_target, match_rule_index, match_is_priority in matches_for_this_file:
if match_is_priority:
file_has_priority_match_for_rule[match_rule_index] = True
# Determine if this file has any regular match for a given rule_index
file_has_regular_match_for_rule: Dict[int, bool] = defaultdict(bool)
for match_target, match_rule_index, match_is_priority in matches_for_this_file:
if not match_is_priority:
file_has_regular_match_for_rule[match_rule_index] = True
# Identify trumped regular matches for this file
for match_target, match_rule_index, match_is_priority in matches_for_this_file:
if not match_is_priority: # Only consider regular matches
if match_rule_index in rule_index_has_priority_match_in_asset:
# This regular match is for a rule_index that had a priority match somewhere in the asset
if not file_has_priority_match_for_rule[match_rule_index]:
# And this specific file did NOT have a priority match for this rule_index
trumped_regular_matches.add((file_path_str, match_rule_index))
log.debug(f" File '{Path(file_path_str).name}': Regular match for Rule Index {match_rule_index} is trumped.")
if "BoucleChunky001" in file_path_str:
log.info(f"DEBUG_ROO: TRUMPED: File '{Path(file_path_str).name}': Regular match for Rule Index {match_rule_index} (target {match_target}) is trumped.")
if "BoucleChunky001" in file_path_str: # Check if it was actually added by checking the set, or just log if the condition was met
if (file_path_str, match_rule_index) in trumped_regular_matches:
log.info(f"DEBUG_ROO: TRUMPED: File '{Path(file_path_str).name}': Regular match for Rule Index {match_rule_index} (target {match_target}) is trumped.")
log.debug(f"--- Finished Pass 2. Identified {len(trumped_regular_matches)} trumped regular matches. ---")
# --- Pass 3: Final Assignment & Inter-Entry Resolution ---
# Iterate through files, apply ignore rules, and then apply earliest rule wins for remaining valid matches.
log.debug("--- Starting Classification Pass 3: Final Assignment ---")
final_file_assignments: Dict[str, str] = {} # {file_path: final_item_type}
for file_path_str in file_list:
# Check if the file was already classified as EXTRA in Pass 1 and added to classified_files_info
if file_path_str in files_classified_as_extra:
log.debug(f" Final Assignment: Skipping '{Path(file_path_str).name}' as it was already classified as EXTRA in Pass 1.")
continue # Skip this file in Pass 3 as it's already handled
asset_name = get_asset_name(Path(file_path_str), config) # Need asset name for the final output structure
# Get valid matches for this file after considering intra-entry priority trumps regular
valid_matches = []
for match_target, match_rule_index, match_is_priority in file_matches.get(file_path_str, []):
if (file_path_str, match_rule_index) not in trumped_regular_matches:
valid_matches.append((match_target, match_rule_index, match_is_priority))
log.debug(f" File '{Path(file_path_str).name}': Valid match - Target: '{match_target}', Rule Index: {match_rule_index}, Priority: {match_is_priority}")
else:
log.debug(f" File '{Path(file_path_str).name}': Invalid match (trumped by priority) - Target: '{match_target}', Rule Index: {match_rule_index}, Priority: {match_is_priority}")
if "BoucleChunky001" in file_path_str:
log.info(f"DEBUG_ROO: PASS 3 PRE-ASSIGN: File '{Path(file_path_str).name}'. Valid matches: {valid_matches}")
if "BoucleChunky001" in file_path_str:
log.info(f"DEBUG_ROO: PASS 3 PRE-ASSIGN: File '{Path(file_path_str).name}'. Valid matches: {valid_matches}")
final_item_type = "FILE_IGNORE" # Default to ignore if no valid matches
if valid_matches:
# Apply earliest rule wins among valid matches
best_match = min(valid_matches, key=lambda x: x[1]) # Find match with lowest rule_index
final_item_type = best_match[0] # Assign the target_type of the best match
log.debug(f" File '{Path(file_path_str).name}': Best valid match -> Target: '{best_match[0]}', Rule Index: {best_match[1]}. Final type: '{final_item_type}'.")
else:
log.debug(f" File '{Path(file_path_str).name}'': No valid matches after filtering. Final type: '{final_item_type}'.")
if "BoucleChunky001" in file_path_str:
log.info(f"DEBUG_ROO: PASS 3 FINAL ASSIGN: File '{Path(file_path_str).name}' -> Final Type: '{final_item_type}'")
final_file_assignments[file_path_str] = final_item_type
if "BoucleChunky001" in file_path_str:
log.info(f"DEBUG_ROO: PASS 3 FINAL ASSIGN: File '{Path(file_path_str).name}' -> Final Type: '{final_item_type}'")
# Add the file info to the classified_files_info structure
log.info(f"DEBUG_ROO: PASS 3 APPEND: Appending file '{Path(file_path_str).name}' with type '{final_item_type}' to classified_files_info['{asset_name}']")
classified_files_info[asset_name].append({
'file_path': file_path_str,
'item_type': final_item_type,
'asset_name': asset_name
})
log.debug(f" Final Grouping: '{Path(file_path_str).name}' -> '{final_item_type}' (Asset: '{asset_name}')")
log.debug(f"Classification complete. Found {len(classified_files_info)} potential assets.")
# Enhanced logging for the content of classified_files_info
boucle_chunky_data = {
key: val for key, val in classified_files_info.items()
if 'BoucleChunky001' in key or any('BoucleChunky001' in (f_info.get('file_path','')) for f_info in val)
}
import json # Make sure json is imported if not already at top of file
log.info(f"DEBUG_ROO: Final classified_files_info for BoucleChunky001 (content): \n{json.dumps(boucle_chunky_data, indent=2)}")
return dict(classified_files_info)
class RuleBasedPredictionHandler(BasePredictionHandler):
"""
Handles running rule-based predictions in a separate thread using presets.
Generates the initial SourceRule hierarchy based on file lists and presets.
Inherits from BasePredictionHandler for common threading and signaling.
"""
def __init__(self, input_source_identifier: str, original_input_paths: list[str], preset_name: str, parent: QObject = None):
"""
Initializes the rule-based handler.
Args:
input_source_identifier: The unique identifier for the input source (e.g., file path).
original_input_paths: List of absolute file paths extracted from the source.
preset_name: The name of the preset configuration to use.
parent: The parent QObject.
"""
super().__init__(input_source_identifier, parent)
self.original_input_paths = original_input_paths
self.preset_name = preset_name
self._current_input_path = None
self._current_file_list = None
self._current_preset_name = None
# Re-introduce run_prediction as the main slot to receive requests
@Slot(str, list, str)
def run_prediction(self, input_source_identifier: str, original_input_paths: list[str], preset_name: str):
"""
Generates the initial SourceRule hierarchy for a given source identifier,
file list, and preset name. Populates only overridable fields based on
classification and preset defaults.
This method is intended to be run in the handler's QThread.
Uses the base class signals for reporting results/errors.
"""
# Check if already running a prediction for a *different* source
# Allow re-triggering for the *same* source if needed (e.g., preset changed)
if self._is_running and self._current_input_path != input_source_identifier:
log.warning(f"RuleBasedPredictionHandler is busy with '{self._current_input_path}'. Ignoring request for '{input_source_identifier}'.")
return
self._is_running = True
self._is_cancelled = False
self._current_input_path = input_source_identifier
self._current_file_list = original_input_paths
self._current_preset_name = preset_name
log.info(f"Starting rule-based prediction for: {input_source_identifier} using preset: {preset_name}")
self.status_update.emit(f"Starting analysis for '{Path(input_source_identifier).name}'...")
source_rules_list = []
try:
if not BACKEND_AVAILABLE:
raise RuntimeError("Backend/config modules not available. Cannot run prediction.")
if not preset_name:
log.warning("No preset selected for prediction.")
self.status_update.emit("No preset selected.")
self.prediction_ready.emit(input_source_identifier, [])
self._is_running = False
return
source_path = Path(input_source_identifier)
if not source_path.exists():
log.warning(f"Input source path does not exist: '{input_source_identifier}'. Skipping prediction.")
raise FileNotFoundError(f"Input source path not found: {input_source_identifier}")
# --- Load Configuration ---
config = Configuration(preset_name)
log.info(f"Successfully loaded configuration for preset '{preset_name}'.")
if self._is_cancelled: raise RuntimeError("Prediction cancelled before classification.")
# --- Perform Classification ---
self.status_update.emit(f"Classifying files for '{source_path.name}'...")
try:
classified_assets = classify_files(original_input_paths, config)
except Exception as e:
log.exception(f"Error during file classification for source '{input_source_identifier}': {e}")
raise RuntimeError(f"Error classifying files: {e}") from e
if self._is_cancelled: raise RuntimeError("Prediction cancelled after classification.")
if not classified_assets:
log.warning(f"Classification yielded no assets for source '{input_source_identifier}'.")
self.status_update.emit("No assets identified from files.")
self.prediction_ready.emit(input_source_identifier, [])
self._is_running = False
return
# --- Build the Hierarchy ---
self.status_update.emit(f"Building rule hierarchy for '{source_path.name}'...")
try:
supplier_identifier = config.supplier_name
source_rule = SourceRule(
input_path=input_source_identifier,
supplier_identifier=supplier_identifier,
# Use the internal display name from the config object
preset_name=config.internal_display_preset_name
)
asset_rules = []
file_type_definitions = config._core_settings.get('FILE_TYPE_DEFINITIONS', {})
for asset_name, files_info in classified_assets.items():
if self._is_cancelled: raise RuntimeError("Prediction cancelled during hierarchy building (assets).")
if not files_info: continue
asset_category_rules = config.asset_category_rules
asset_type_definitions = config.get_asset_type_definitions()
asset_type_keys = list(asset_type_definitions.keys())
# Initialize predicted_asset_type using the validated default
predicted_asset_type = config.default_asset_category
log.debug(f"Asset '{asset_name}': Initial predicted_asset_type set to default: '{predicted_asset_type}'.")
# 1. Check asset_category_rules from preset
determined_by_rule = False
# Check for Model type based on file patterns
if "Model" in asset_type_keys:
model_patterns_regex = config.compiled_model_regex
for f_info in files_info:
if f_info['item_type'] in ["EXTRA", "FILE_IGNORE"]:
continue
file_path_obj = Path(f_info['file_path'])
for pattern_re in model_patterns_regex:
if pattern_re.search(file_path_obj.name):
predicted_asset_type = "Model"
determined_by_rule = True
log.debug(f"Asset '{asset_name}' classified as 'Model' due to file '{file_path_obj.name}' matching pattern '{pattern_re.pattern}'.")
break
if determined_by_rule:
break
# Check for Decal type based on keywords in asset name (if not already Model)
if not determined_by_rule and "Decal" in asset_type_keys:
decal_keywords = asset_category_rules.get('decal_keywords', [])
for keyword in decal_keywords:
# Ensure keyword is a string before trying to escape it
if isinstance(keyword, str) and keyword:
try:
if re.search(r'\b' + re.escape(keyword) + r'\b', asset_name, re.IGNORECASE):
predicted_asset_type = "Decal"
determined_by_rule = True
log.debug(f"Asset '{asset_name}' classified as 'Decal' due to keyword '{keyword}'.")
break
except re.error as e_re:
log.warning(f"Regex error with decal_keyword '{keyword}': {e_re}")
if determined_by_rule:
pass
# 2. If not determined by specific rules, check for Surface (if not Model/Decal by rule)
if not determined_by_rule and predicted_asset_type == config.default_asset_category and "Surface" in asset_type_keys:
item_types_in_asset = {f_info['item_type'] for f_info in files_info}
# Ensure we are checking against standard map types from FILE_TYPE_DEFINITIONS
# This check is primarily for PBR texture sets.
material_indicators = {
ft_key for ft_key, ft_def in config.get_file_type_definitions_with_examples().items()
if ft_def.get('standard_type') and ft_def.get('standard_type') not in ["", "EXTRA", "FILE_IGNORE", "MODEL"]
}
# Add common direct standard types as well for robustness
material_indicators.update({"COL", "NRM", "ROUGH", "METAL", "AO", "DISP"})
has_material_map = False
for item_type in item_types_in_asset:
# Check if the item_type itself is a material indicator or its standard_type is
if item_type in material_indicators:
has_material_map = True
break
# Check standard type if item_type is a key in FILE_TYPE_DEFINITIONS
item_def = config.get_file_type_definitions_with_examples().get(item_type)
if item_def and item_def.get('standard_type') in material_indicators:
has_material_map = True
break
if has_material_map:
predicted_asset_type = "Surface"
log.debug(f"Asset '{asset_name}' classified as 'Surface' due to material indicators.")
# 3. Final validation: Ensure predicted_asset_type is a valid key.
if predicted_asset_type not in asset_type_keys:
log.warning(f"Derived AssetType '{predicted_asset_type}' for asset '{asset_name}' is not in ASSET_TYPE_DEFINITIONS. "
f"Falling back to default: '{config.default_asset_category}'.")
predicted_asset_type = config.default_asset_category
asset_rule = AssetRule(asset_name=asset_name, asset_type=predicted_asset_type)
file_rules = []
for file_info in files_info:
if self._is_cancelled: raise RuntimeError("Prediction cancelled during hierarchy building (files).")
base_item_type = file_info['item_type']
target_asset_name_override = file_info['asset_name']
final_item_type = base_item_type
# The classification logic now returns the final item_type directly,
# including "FILE_IGNORE" and correctly prioritized MAP_ types.
# No need for the old MAP_ prefixing logic here.
# Validate the final_item_type against definitions, unless it's EXTRA or FILE_IGNORE
if final_item_type not in ["EXTRA", "FILE_IGNORE"] and file_type_definitions and final_item_type not in file_type_definitions:
log.warning(f"Predicted ItemType '{final_item_type}' for file '{file_info['file_path']}' is not in FILE_TYPE_DEFINITIONS. Setting to FILE_IGNORE.")
final_item_type = "FILE_IGNORE"
file_rule = FileRule(
file_path=file_info['file_path'],
item_type=final_item_type,
item_type_override=final_item_type, # item_type_override defaults to item_type
target_asset_name_override=target_asset_name_override,
output_format_override=None,
resolution_override=None,
channel_merge_instructions={},
)
file_rules.append(file_rule)
asset_rule.files = file_rules
asset_rules.append(asset_rule)
source_rule.assets = asset_rules
source_rules_list.append(source_rule)
# DEBUG: Log the structure of the source_rule being emitted
if source_rule and source_rule.assets:
for asset_r_idx, asset_r in enumerate(source_rule.assets):
log.info(f"DEBUG_ROO_EMIT: Source '{input_source_identifier}', Asset {asset_r_idx} ('{asset_r.asset_name}') has {len(asset_r.files)} FileRules.")
for fr_idx, fr in enumerate(asset_r.files):
log.info(f"DEBUG_ROO_EMIT: FR {fr_idx}: Path='{fr.file_path}', Type='{fr.item_type}', TargetAsset='{fr.target_asset_name_override}'")
elif source_rule:
log.info(f"DEBUG_ROO_EMIT: Emitting SourceRule for {input_source_identifier} but it has no assets.")
else:
log.info(f"DEBUG_ROO_EMIT: Attempting to emit for {input_source_identifier}, but source_rule object is None.")
# END DEBUG
except Exception as e:
log.exception(f"Error building rule hierarchy for source '{input_source_identifier}': {e}")
raise RuntimeError(f"Error building rule hierarchy: {e}") from e
# --- Emit Success Signal ---
log.info(f"Rule-based prediction finished successfully for '{input_source_identifier}'.")
self.prediction_ready.emit(input_source_identifier, source_rules_list)
except Exception as e:
# --- Emit Error Signal ---
log.exception(f"Error during rule-based prediction for '{input_source_identifier}': {e}")
error_msg = f"Error analyzing '{Path(input_source_identifier).name}': {e}"
self.prediction_error.emit(input_source_identifier, error_msg)
finally:
self._is_running = False
self._current_input_path = None
self._current_file_list = None
self._current_preset_name = None
log.info(f"Finished rule-based prediction run for: {input_source_identifier}")
def is_running(self) -> bool:
"""Returns True if the handler is currently processing a prediction request."""
return self._is_running