import os import json import requests import re # Added import for regex import logging # Add logging from pathlib import Path # Add Path for basename from PySide6.QtCore import QObject, Slot # Keep QObject for parent type hint, Slot for cancel if kept separate # Removed Signal, QThread as they are handled by BasePredictionHandler or caller from typing import List, Dict, Any # Assuming rule_structure defines SourceRule, AssetRule, FileRule etc. # Adjust the import path if necessary based on project structure from rule_structure import SourceRule, AssetRule, FileRule # Ensure AssetRule and FileRule are imported # Assuming configuration loads app_settings.json # Adjust the import path if necessary # Removed Configuration import, will use load_base_config if needed or passed settings # from configuration import Configuration # from configuration import load_base_config # No longer needed here from .base_prediction_handler import BasePredictionHandler # Import base class log = logging.getLogger(__name__) # Setup logger class LLMPredictionHandler(BasePredictionHandler): """ Handles the interaction with an LLM for predicting asset structures based on a directory's file list. Inherits from BasePredictionHandler. """ # Signals (prediction_ready, prediction_error, status_update) are inherited def __init__(self, input_source_identifier: str, file_list: list, llm_settings: dict, parent: QObject = None): """ Initializes the LLM handler. Args: input_source_identifier: The unique identifier for the input source (e.g., file path). file_list: A list of *relative* file paths extracted from the input source. (LLM expects relative paths based on the prompt template). llm_settings: A dictionary containing necessary LLM configuration (endpoint_url, api_key, prompt_template_content, etc.). parent: The parent QObject. """ super().__init__(input_source_identifier, parent) # input_source_identifier is stored by the base class as self.input_source_identifier self.file_list = file_list # Store the provided relative file list self.llm_settings = llm_settings # Store the settings dictionary self.endpoint_url = self.llm_settings.get('llm_endpoint_url') self.api_key = self.llm_settings.get('llm_api_key') # _is_running and _is_cancelled are handled by the base class # The run() and cancel() slots are provided by the base class. # We only need to implement the core logic in _perform_prediction. def _perform_prediction(self) -> List[SourceRule]: """ Performs the LLM prediction by preparing the prompt, calling the LLM, and parsing the response. Implements the abstract method from BasePredictionHandler. Returns: A list containing a single SourceRule object based on the LLM response, or an empty list if prediction fails or yields no results. Raises: ValueError: If required settings (like endpoint URL or prompt template) are missing. ConnectionError: If the LLM API call fails due to network issues or timeouts. Exception: For other errors during prompt preparation, API call, or parsing. """ log.info(f"Performing LLM prediction for: {self.input_source_identifier}") base_name = Path(self.input_source_identifier).name # Use the file list passed during initialization if not self.file_list: log.warning(f"No files provided for LLM prediction for {self.input_source_identifier}. Returning empty list.") self.status_update.emit(f"No files found for {base_name}.") # Use base signal return [] # Return empty list, not an error # Check for cancellation before preparing prompt if self._is_cancelled: log.info("LLM prediction cancelled before preparing prompt.") return [] # --- Prepare Prompt --- self.status_update.emit(f"Preparing LLM input for {base_name}...") try: # Pass relative file list prompt = self._prepare_prompt(self.file_list) except Exception as e: log.exception("Error preparing LLM prompt.") raise ValueError(f"Error preparing LLM prompt: {e}") from e # Re-raise for base handler if self._is_cancelled: log.info("LLM prediction cancelled after preparing prompt.") return [] # --- Call LLM --- self.status_update.emit(f"Calling LLM for {base_name}...") try: llm_response_json_str = self._call_llm(prompt) except Exception as e: log.exception("Error calling LLM API.") # Re-raise potentially specific errors (ConnectionError, ValueError) or a generic one raise RuntimeError(f"Error calling LLM: {e}") from e if self._is_cancelled: log.info("LLM prediction cancelled after calling LLM.") return [] # --- Parse Response --- self.status_update.emit(f"Parsing LLM response for {base_name}...") try: predicted_rules = self._parse_llm_response(llm_response_json_str) except Exception as e: log.exception("Error parsing LLM response.") raise ValueError(f"Error parsing LLM response: {e}") from e # Re-raise for base handler if self._is_cancelled: log.info("LLM prediction cancelled after parsing response.") return [] log.info(f"LLM prediction finished successfully for '{self.input_source_identifier}'.") # The base class run() method will emit prediction_ready with these results return predicted_rules # --- Helper Methods (Keep these internal to this class) --- def _prepare_prompt(self, relative_file_list: List[str]) -> str: """ Prepares the full prompt string to send to the LLM using stored settings. """ # Access settings from the stored dictionary prompt_template = self.llm_settings.get('prompt_template_content') if not prompt_template: # Attempt to fall back to reading the default file path if content is missing default_template_path = 'llm_prototype/prompt_template.txt' print(f"Warning: 'prompt_template_content' missing in llm_settings. Falling back to reading default file: {default_template_path}") try: with open(default_template_path, 'r', encoding='utf-8') as f: prompt_template = f.read() except FileNotFoundError: raise ValueError(f"LLM predictor prompt template content missing in settings and default file not found at: {default_template_path}") except Exception as e: raise ValueError(f"Error reading default LLM prompt template file {default_template_path}: {e}") if not prompt_template: # Final check after potential fallback raise ValueError("LLM predictor prompt template content is empty or could not be loaded.") # Access definitions and examples from the settings dictionary asset_defs = json.dumps(self.llm_settings.get('asset_types', {}), indent=4) file_defs = json.dumps(self.llm_settings.get('file_types', {}), indent=4) examples = json.dumps(self.llm_settings.get('examples', []), indent=2) # Format *relative* file list as a single string with newlines file_list_str = "\n".join(relative_file_list) # Replace placeholders prompt = prompt_template.replace('{ASSET_TYPE_DEFINITIONS}', asset_defs) prompt = prompt.replace('{FILE_TYPE_DEFINITIONS}', file_defs) prompt = prompt.replace('{EXAMPLE_INPUT_OUTPUT_PAIRS}', examples) prompt = prompt.replace('{FILE_LIST}', file_list_str) return prompt def _call_llm(self, prompt: str) -> str: """ Calls the configured LLM API endpoint with the prepared prompt. Args: prompt: The complete prompt string. Returns: The content string from the LLM response, expected to be JSON. Raises: ConnectionError: If the request fails due to network issues or timeouts. ValueError: If the endpoint URL is not configured or the response is invalid. requests.exceptions.RequestException: For other request-related errors. """ if not self.endpoint_url: raise ValueError("LLM endpoint URL is not configured in settings.") headers = { "Content-Type": "application/json", } if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" # Construct payload based on OpenAI Chat Completions format payload = { # Use configured model name, default to 'local-model' "model": self.llm_settings.get("llm_model_name", "local-model"), "messages": [{"role": "user", "content": prompt}], # Use configured temperature, default to 0.5 "temperature": self.llm_settings.get("llm_temperature", 0.5), # Add max_tokens if needed/configurable: # "max_tokens": self.llm_settings.get("llm_max_tokens", 1024), # Ensure the LLM is instructed to return JSON in the prompt itself # Some models/endpoints support a specific json mode: # "response_format": { "type": "json_object" } # If supported by endpoint } # Status update emitted by _perform_prediction before calling this # self.status_update.emit(f"Sending request to LLM at {self.endpoint_url}...") print(f"--- Calling LLM API: {self.endpoint_url} ---") # print(f"--- Payload Preview ---\n{json.dumps(payload, indent=2)[:500]}...\n--- END Payload Preview ---") # Note: Exceptions raised here (Timeout, RequestException, ValueError) # will be caught by the _perform_prediction method's handler. # Make the POST request with a timeout response = requests.post( self.endpoint_url, headers=headers, json=payload, timeout=self.llm_settings.get("llm_request_timeout", 120) ) response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) # Parse the JSON response response_data = response.json() # print(f"--- LLM Raw Response ---\n{json.dumps(response_data, indent=2)}\n--- END Raw Response ---") # Debugging # Extract content - structure depends on the API (OpenAI format assumed) if "choices" in response_data and len(response_data["choices"]) > 0: message = response_data["choices"][0].get("message", {}) content = message.get("content") if content: # The content itself should be the JSON string we asked for log.debug("--- LLM Response Content Extracted Successfully ---") return content.strip() else: raise ValueError("LLM response missing 'content' in choices[0].message.") else: raise ValueError("LLM response missing 'choices' array or it's empty.") def _parse_llm_response(self, llm_response_json_str: str) -> List[SourceRule]: """ Parses the LLM's JSON response string into a list of SourceRule objects. """ # Note: Exceptions (JSONDecodeError, ValueError) raised here # will be caught by the _perform_prediction method's handler. # Strip potential markdown code fences before parsing clean_json_str = llm_response_json_str.strip() if clean_json_str.startswith("```json"): clean_json_str = clean_json_str[7:] # Remove ```json\n if clean_json_str.endswith("```"): clean_json_str = clean_json_str[:-3] # Remove ``` clean_json_str = clean_json_str.strip() # Remove any extra whitespace # --- ADDED: Remove tags --- clean_json_str = re.sub(r'.*?', '', clean_json_str, flags=re.DOTALL | re.IGNORECASE) clean_json_str = clean_json_str.strip() # Strip again after potential removal # --------------------------------- try: response_data = json.loads(clean_json_str) except json.JSONDecodeError as e: # Log the full cleaned string that caused the error for better debugging error_detail = f"Failed to decode LLM JSON response: {e}\nFull Cleaned Response:\n{clean_json_str}" log.error(f"ERROR: {error_detail}") # Log full error detail to console raise ValueError(error_detail) # Raise the error with full detail if "predicted_assets" not in response_data or not isinstance(response_data["predicted_assets"], list): raise ValueError("Invalid LLM response format: 'predicted_assets' key missing or not a list.") source_rules = [] # We assume one SourceRule per input source processed by this handler instance # Use self.input_source_identifier from the base class source_rule = SourceRule(input_path=self.input_source_identifier) # Access valid types from the settings dictionary valid_asset_types = list(self.llm_settings.get('asset_types', {}).keys()) valid_file_types = list(self.llm_settings.get('file_types', {}).keys()) for asset_data in response_data["predicted_assets"]: # Check for cancellation within the loop if self._is_cancelled: log.info("LLM prediction cancelled during response parsing (assets).") return [] if not isinstance(asset_data, dict): log.warning(f"Skipping invalid asset data (not a dict): {asset_data}") continue asset_name = asset_data.get("suggested_asset_name", "Unnamed_Asset") asset_type = asset_data.get("predicted_asset_type") if asset_type not in valid_asset_types: log.warning(f"Invalid predicted_asset_type '{asset_type}' for asset '{asset_name}'. Skipping asset.") continue # Skip this asset asset_rule = AssetRule(asset_name=asset_name, asset_type=asset_type) source_rule.assets.append(asset_rule) if "files" not in asset_data or not isinstance(asset_data["files"], list): log.warning(f"'files' key missing or not a list in asset '{asset_name}'. Skipping files for this asset.") continue for file_data in asset_data["files"]: # Check for cancellation within the inner loop if self._is_cancelled: log.info("LLM prediction cancelled during response parsing (files).") return [] if not isinstance(file_data, dict): log.warning(f"Skipping invalid file data (not a dict) in asset '{asset_name}': {file_data}") continue file_path_rel = file_data.get("file_path") # LLM provides relative path file_type = file_data.get("predicted_file_type") if not file_path_rel: log.warning(f"Missing 'file_path' in file data for asset '{asset_name}'. Skipping file.") continue # Convert relative path from LLM (using '/') back to absolute OS-specific path # We need the original input path (directory or archive) to make it absolute # Use self.input_source_identifier which holds the original path # IMPORTANT: Ensure the LLM is actually providing paths relative to the *root* of the input source. try: # Use Pathlib for safer joining, assuming input_source_identifier is the parent dir/archive path # If input_source_identifier is an archive file, this logic might need adjustment # depending on where files were extracted. For now, assume it's the base path. base_path = Path(self.input_source_identifier) # If the input was a file (like a zip), use its parent directory as the base for joining relative paths if base_path.is_file(): base_path = base_path.parent # Clean the relative path potentially coming from LLM clean_rel_path = Path(file_path_rel.strip().replace('\\', '/')) file_path_abs = str(base_path / clean_rel_path) except Exception as path_e: log.warning(f"Error constructing absolute path for '{file_path_rel}' relative to '{self.input_source_identifier}': {path_e}. Skipping file.") continue if file_type not in valid_file_types: log.warning(f"Invalid predicted_file_type '{file_type}' for file '{file_path_rel}'. Defaulting to EXTRA.") file_type = "EXTRA" # Default to EXTRA if invalid type from LLM # Create the FileRule instance # Add default values for fields not provided by LLM file_rule = FileRule( file_path=file_path_abs, item_type=file_type, item_type_override=file_type, # Initial override target_asset_name_override=asset_name, # Default to asset name output_format_override=None, is_gloss_source=False, # LLM doesn't predict this resolution_override=None, channel_merge_instructions={} ) asset_rule.files.append(file_rule) source_rules.append(source_rule) return source_rules # Removed conceptual example usage comments