859 lines
43 KiB
Python
859 lines
43 KiB
Python
import argparse
|
|
import sys
|
|
import logging
|
|
import logging.handlers
|
|
import time
|
|
import json
|
|
import shutil # Import shutil for directory operations
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
|
|
from PySide6.QtCore import QCoreApplication, QTimer, Slot, QEventLoop, QObject, Signal
|
|
from PySide6.QtWidgets import QApplication, QListWidgetItem
|
|
|
|
# Add project root to sys.path
|
|
project_root = Path(__file__).resolve().parent
|
|
if str(project_root) not in sys.path:
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
try:
|
|
from main import App
|
|
from gui.main_window import MainWindow
|
|
from rule_structure import SourceRule # Assuming SourceRule is in rule_structure.py
|
|
except ImportError as e:
|
|
print(f"Error importing project modules: {e}")
|
|
print(f"Ensure that the script is run from the project root or that the project root is in PYTHONPATH.")
|
|
print(f"Current sys.path: {sys.path}")
|
|
sys.exit(1)
|
|
|
|
# Global variable for the memory log handler
|
|
autotest_memory_handler = None
|
|
|
|
# Custom Log Filter for Concise Output
|
|
class InfoSummaryFilter(logging.Filter):
|
|
# Keywords that identify INFO messages to *allow* for concise output
|
|
SUMMARY_KEYWORDS_PRECISE = [
|
|
"Test run completed",
|
|
"Test succeeded",
|
|
"Test failed",
|
|
"Rule comparison successful",
|
|
"Rule comparison failed",
|
|
"ProcessingEngine finished. Summary:",
|
|
"Autotest Context:",
|
|
"Parsed CLI arguments:",
|
|
"Prediction completed successfully.",
|
|
"Processing completed.",
|
|
"Signal 'all_tasks_finished' received",
|
|
"final status:", # To catch "Asset '...' final status:"
|
|
"User settings file not found:",
|
|
"MainPanelWidget: Default output directory set to:",
|
|
# Search related (as per original filter)
|
|
"Searching logs for term",
|
|
"Search term ",
|
|
"Found ",
|
|
"No tracebacks found in the logs.",
|
|
"--- End Log Analysis ---",
|
|
"Log analysis completed.",
|
|
]
|
|
# Patterns for case-insensitive rejection
|
|
REJECT_PATTERNS_LOWER = [
|
|
# Original debug prefixes (ensure these are still relevant or merge if needed)
|
|
"debug:", "orchestrator_trace:", "configuration_debug:", "app_debug:", "output_org_debug:",
|
|
# Iterative / Per-item / Per-file details / Intermediate steps
|
|
": item ", # Catches "Asset '...', Item X/Y"
|
|
"item successfully processed and saved",
|
|
", file '", # Catches "Asset '...', File '...'"
|
|
": processing regular map",
|
|
": found source file:",
|
|
": determined source bit depth:",
|
|
"successfully processed regular map",
|
|
"successfully created mergetaskdefinition",
|
|
": preparing processing items",
|
|
": finished preparing items. found",
|
|
": starting core item processing loop",
|
|
", task '",
|
|
": processing merge task",
|
|
"loaded from context:",
|
|
"using dimensions from first loaded input",
|
|
"successfully merged inputs into image",
|
|
"successfully processed merge task",
|
|
"mergedtaskprocessorstage result",
|
|
"calling savevariantsstage",
|
|
"savevariantsstage result",
|
|
"adding final details to context",
|
|
": finished core item processing loop",
|
|
": copied variant",
|
|
": copied extra file",
|
|
": successfully organized",
|
|
": output organization complete.",
|
|
": metadata saved to",
|
|
"worker thread: starting processing for rule:",
|
|
"preparing workspace for input:",
|
|
"input is a supported archive",
|
|
"calling processingengine.process with rule",
|
|
"calculated sha5 for",
|
|
"calculated next incrementing value for",
|
|
"verify: processingengine.process called",
|
|
": effective supplier set to",
|
|
": metadata initialized.",
|
|
"path",
|
|
"\\asset_processor",
|
|
": file rules queued for processing",
|
|
"successfully loaded base application settings",
|
|
"successfully loaded and merged asset_type_definitions",
|
|
"successfully loaded and merged file_type_definitions",
|
|
"starting rule-based prediction for:",
|
|
"rule-based prediction finished successfully for",
|
|
"finished rule-based prediction run for",
|
|
"updating model with rule-based results for source:",
|
|
"debug task ",
|
|
"worker thread: finished processing for rule:",
|
|
"task finished signal received for",
|
|
# Autotest step markers (not global summaries)
|
|
]
|
|
|
|
def filter(self, record):
|
|
# Allow CRITICAL, ERROR, WARNING unconditionally
|
|
if record.levelno >= logging.WARNING:
|
|
return True
|
|
|
|
if record.levelno == logging.INFO:
|
|
msg = record.getMessage()
|
|
msg_lower = msg.lower() # For case-insensitive pattern rejection
|
|
|
|
# 1. Explicitly REJECT if message contains verbose patterns (case-insensitive)
|
|
for pattern in self.REJECT_PATTERNS_LOWER: # Use the new list
|
|
if pattern in msg_lower:
|
|
return False # Reject
|
|
|
|
# 2. Then, if not rejected, ALLOW only if message contains precise summary keywords
|
|
for keyword in self.SUMMARY_KEYWORDS_PRECISE: # Use the new list
|
|
if keyword in msg: # Original message for case-sensitive summary keywords if needed
|
|
return True # Allow
|
|
|
|
# 3. Reject all other INFO messages that don't match precise summary keywords
|
|
return False
|
|
|
|
# Reject levels below INFO (e.g., DEBUG) by default for this handler
|
|
return False
|
|
|
|
# --- Root Logger Configuration for Concise Console Output ---
|
|
def setup_autotest_logging():
|
|
"""
|
|
Configures the root logger for concise console output for autotest.py.
|
|
This ensures that only essential summary information, warnings, and errors
|
|
are displayed on the console by default.
|
|
"""
|
|
root_logger = logging.getLogger()
|
|
|
|
# 1. Remove all existing handlers from the root logger.
|
|
# This prevents interference from other logging configurations.
|
|
for handler in root_logger.handlers[:]:
|
|
root_logger.removeHandler(handler)
|
|
handler.close() # Close handler before removing
|
|
|
|
# 2. Set the root logger's level to DEBUG to capture everything for the memory handler.
|
|
# The console handler will still filter down to INFO/selected.
|
|
root_logger.setLevel(logging.DEBUG) # Changed from INFO to DEBUG
|
|
|
|
# 3. Create a new StreamHandler for sys.stdout (for concise console output).
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
|
|
# 4. Set this console handler's level to INFO.
|
|
# The filter will then decide which INFO messages to display on console.
|
|
console_handler.setLevel(logging.INFO)
|
|
|
|
# 5. Apply the enhanced InfoSummaryFilter to the console handler.
|
|
info_filter = InfoSummaryFilter()
|
|
console_handler.addFilter(info_filter)
|
|
|
|
# 6. Set a concise formatter for the console handler.
|
|
formatter = logging.Formatter('[%(levelname)s] %(message)s')
|
|
console_handler.setFormatter(formatter)
|
|
|
|
# 7. Add this newly configured console handler to the root_logger.
|
|
root_logger.addHandler(console_handler)
|
|
|
|
# 8. Setup the MemoryHandler
|
|
global autotest_memory_handler # Declare usage of global
|
|
autotest_memory_handler = logging.handlers.MemoryHandler(
|
|
capacity=20000, # Increased capacity
|
|
flushLevel=logging.CRITICAL + 1, # Prevent automatic flushing
|
|
target=None # Does not flush to another handler
|
|
)
|
|
autotest_memory_handler.setLevel(logging.DEBUG) # Capture all logs from DEBUG up
|
|
# Not adding a formatter here, will format in _process_and_display_logs
|
|
|
|
# 9. Add the memory handler to the root logger.
|
|
root_logger.addHandler(autotest_memory_handler)
|
|
|
|
# Call the setup function early in the script's execution.
|
|
setup_autotest_logging()
|
|
|
|
# Logger for autotest.py's own messages.
|
|
# Messages from this logger will propagate to the root logger and be filtered
|
|
# by the console_handler configured above.
|
|
# Setting its level to DEBUG allows autotest.py to generate DEBUG messages,
|
|
# which won't appear on the concise console (due to handler's INFO level)
|
|
# but can be captured by other handlers (e.g., the GUI's log console).
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG) # Ensure autotest.py can generate DEBUGs for other handlers
|
|
|
|
# Note: The GUI's log console (e.g., self.main_window.log_console.log_console_output)
|
|
# is assumed to capture all logs (including DEBUG) from various modules.
|
|
# The _process_and_display_logs function then uses these comprehensive logs for the --search feature.
|
|
# This root logger setup primarily makes autotest.py's direct console output concise,
|
|
# ensuring that only filtered, high-level information appears on stdout by default.
|
|
# --- End of Root Logger Configuration ---
|
|
|
|
# --- Argument Parsing ---
|
|
def parse_arguments():
|
|
"""Parses command-line arguments for the autotest script."""
|
|
parser = argparse.ArgumentParser(description="Automated test script for Asset Processor GUI.")
|
|
parser.add_argument(
|
|
"--zipfile",
|
|
type=Path,
|
|
default=project_root / "TestFiles" / "BoucleChunky001.zip",
|
|
help="Path to the test asset ZIP file. Default: TestFiles/BoucleChunky001.zip"
|
|
)
|
|
parser.add_argument(
|
|
"--preset",
|
|
type=str,
|
|
default="Dinesen", # This should match a preset name in the application
|
|
help="Name of the preset to use. Default: Dinesen"
|
|
)
|
|
parser.add_argument(
|
|
"--expectedrules",
|
|
type=Path,
|
|
default=project_root / "TestFiles" / "Test-BoucleChunky001.json",
|
|
help="Path to the JSON file with expected rules. Default: TestFiles/Test-BoucleChunky001.json"
|
|
)
|
|
parser.add_argument(
|
|
"--outputdir",
|
|
type=Path,
|
|
default=project_root / "TestFiles" / "TestOutputs" / "BoucleChunkyOutput",
|
|
help="Path for processing output. Default: TestFiles/TestOutputs/BoucleChunkyOutput"
|
|
)
|
|
parser.add_argument(
|
|
"--search",
|
|
type=str,
|
|
default=None,
|
|
help="Optional log search term. Default: None"
|
|
)
|
|
parser.add_argument(
|
|
"--additional-lines",
|
|
type=int,
|
|
default=0,
|
|
help="Context lines for log search. Default: 0"
|
|
)
|
|
return parser.parse_args()
|
|
|
|
class AutoTester(QObject):
|
|
"""
|
|
Handles the automated testing process for the Asset Processor GUI.
|
|
"""
|
|
# Define signals if needed, e.g., for specific test events
|
|
# test_step_completed = Signal(str)
|
|
|
|
def __init__(self, app_instance: App, cli_args: argparse.Namespace):
|
|
super().__init__()
|
|
self.app_instance: App = app_instance
|
|
self.main_window: MainWindow = app_instance.main_window
|
|
self.cli_args: argparse.Namespace = cli_args
|
|
self.event_loop = QEventLoop(self)
|
|
self.prediction_poll_timer = QTimer(self)
|
|
self.expected_rules_data: Dict[str, Any] = {}
|
|
self.test_step: str = "INIT" # Possible values: INIT, LOADING_ZIP, SELECTING_PRESET, AWAITING_PREDICTION, PREDICTION_COMPLETE, COMPARING_RULES, STARTING_PROCESSING, AWAITING_PROCESSING, PROCESSING_COMPLETE, CHECKING_OUTPUT, ANALYZING_LOGS, DONE
|
|
|
|
if not self.main_window:
|
|
logger.error("MainWindow instance not found in App. Cannot proceed.")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
|
|
# Connect signals
|
|
if hasattr(self.app_instance, 'all_tasks_finished') and isinstance(self.app_instance.all_tasks_finished, Signal):
|
|
self.app_instance.all_tasks_finished.connect(self._on_all_tasks_finished)
|
|
else:
|
|
logger.warning("App instance does not have 'all_tasks_finished' signal or it's not a Signal. Processing completion might not be detected.")
|
|
|
|
self._load_expected_rules()
|
|
|
|
def _load_expected_rules(self) -> None:
|
|
"""Loads the expected rules from the JSON file specified by cli_args."""
|
|
self.test_step = "LOADING_EXPECTED_RULES"
|
|
logger.debug(f"Loading expected rules from: {self.cli_args.expectedrules}")
|
|
try:
|
|
with open(self.cli_args.expectedrules, 'r') as f:
|
|
self.expected_rules_data = json.load(f)
|
|
logger.debug("Expected rules loaded successfully.")
|
|
except FileNotFoundError:
|
|
logger.error(f"Expected rules file not found: {self.cli_args.expectedrules}")
|
|
self.cleanup_and_exit(success=False)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error decoding expected rules JSON: {e}")
|
|
self.cleanup_and_exit(success=False)
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred while loading expected rules: {e}")
|
|
self.cleanup_and_exit(success=False)
|
|
|
|
def run_test(self) -> None:
|
|
"""Orchestrates the test steps."""
|
|
logger.info("Starting test run...")
|
|
|
|
if not self.expected_rules_data: # Ensure rules were loaded
|
|
logger.error("Expected rules not loaded. Aborting test.")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
|
|
# Add a specific summary log for essential context
|
|
logger.info(f"Autotest Context: Input='{self.cli_args.zipfile.name}', Preset='{self.cli_args.preset}', Output='{self.cli_args.outputdir}'")
|
|
|
|
# Step 1: Load ZIP
|
|
self.test_step = "LOADING_ZIP"
|
|
logger.info(f"Step 1: Loading ZIP file: {self.cli_args.zipfile}") # KEEP INFO - Passes filter
|
|
if not self.cli_args.zipfile.exists():
|
|
logger.error(f"ZIP file not found: {self.cli_args.zipfile}")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
try:
|
|
# Assuming add_input_paths can take a list of strings or Path objects
|
|
self.main_window.add_input_paths([str(self.cli_args.zipfile)])
|
|
logger.debug("ZIP file loading initiated.")
|
|
except Exception as e:
|
|
logger.error(f"Error during ZIP file loading: {e}")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
|
|
# Step 2: Select Preset
|
|
self.test_step = "SELECTING_PRESET"
|
|
logger.info(f"Step 2: Selecting preset: {self.cli_args.preset}") # KEEP INFO - Passes filter
|
|
preset_found = False
|
|
preset_list_widget = self.main_window.preset_editor_widget.editor_preset_list
|
|
for i in range(preset_list_widget.count()):
|
|
item = preset_list_widget.item(i)
|
|
if item and item.text() == self.cli_args.preset:
|
|
preset_list_widget.setCurrentItem(item)
|
|
logger.debug(f"Preset '{self.cli_args.preset}' selected.")
|
|
preset_found = True
|
|
break
|
|
if not preset_found:
|
|
logger.error(f"Preset '{self.cli_args.preset}' not found in the list.")
|
|
available_presets = [preset_list_widget.item(i).text() for i in range(preset_list_widget.count())]
|
|
logger.debug(f"Available presets: {available_presets}")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
|
|
# Step 3: Await Prediction Completion
|
|
self.test_step = "AWAITING_PREDICTION"
|
|
logger.debug("Step 3: Awaiting prediction completion...")
|
|
self.prediction_poll_timer.timeout.connect(self._check_prediction_status)
|
|
self.prediction_poll_timer.start(500) # Poll every 500ms
|
|
|
|
# Use a QTimer to allow event loop to process while waiting for this step
|
|
# This ensures that the _check_prediction_status can be called.
|
|
# We will exit this event_loop from _check_prediction_status when prediction is done.
|
|
logger.debug("Starting event loop for prediction...")
|
|
self.event_loop.exec() # This loop is quit by _check_prediction_status
|
|
self.prediction_poll_timer.stop()
|
|
logger.debug("Event loop for prediction finished.")
|
|
|
|
|
|
if self.test_step != "PREDICTION_COMPLETE":
|
|
logger.error(f"Prediction did not complete as expected. Current step: {self.test_step}")
|
|
# Check if there were any pending predictions that never cleared
|
|
if hasattr(self.main_window, '_pending_predictions'):
|
|
logger.error(f"Pending predictions at timeout: {self.main_window._pending_predictions}")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
logger.info("Prediction completed successfully.") # KEEP INFO - Passes filter
|
|
|
|
# Step 4: Retrieve & Compare Rulelist
|
|
self.test_step = "COMPARING_RULES"
|
|
logger.info("Step 4: Retrieving and Comparing Rules...") # KEEP INFO - Passes filter
|
|
actual_source_rules_list: List[SourceRule] = self.main_window.unified_model.get_all_source_rules()
|
|
actual_rules_obj = actual_source_rules_list # Keep the SourceRule list for processing
|
|
|
|
comparable_actual_rules = self._convert_rules_to_comparable(actual_source_rules_list)
|
|
|
|
if not self._compare_rules(comparable_actual_rules, self.expected_rules_data):
|
|
logger.error("Rule comparison failed. See logs for details.")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
logger.info("Rule comparison successful.") # KEEP INFO - Passes filter
|
|
|
|
# Step 5: Start Processing
|
|
self.test_step = "START_PROCESSING"
|
|
logger.info("Step 5: Starting Processing...") # KEEP INFO - Passes filter
|
|
processing_settings = {
|
|
"output_dir": str(self.cli_args.outputdir), # Ensure it's a string for JSON/config
|
|
"overwrite": True,
|
|
"workers": 1,
|
|
"blender_enabled": False # Basic test, no Blender
|
|
}
|
|
try:
|
|
Path(self.cli_args.outputdir).mkdir(parents=True, exist_ok=True)
|
|
logger.debug(f"Ensured output directory exists: {self.cli_args.outputdir}")
|
|
except Exception as e:
|
|
logger.error(f"Could not create output directory {self.cli_args.outputdir}: {e}")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
|
|
if hasattr(self.main_window, 'start_backend_processing') and isinstance(self.main_window.start_backend_processing, Signal):
|
|
logger.debug(f"Emitting start_backend_processing with rules count: {len(actual_rules_obj)} and settings: {processing_settings}")
|
|
self.main_window.start_backend_processing.emit(actual_rules_obj, processing_settings)
|
|
else:
|
|
logger.error("'start_backend_processing' signal not found on MainWindow. Cannot start processing.")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
|
|
# Step 6: Await Processing Completion
|
|
self.test_step = "AWAIT_PROCESSING"
|
|
logger.debug("Step 6: Awaiting processing completion...")
|
|
self.event_loop.exec() # This loop is quit by _on_all_tasks_finished
|
|
|
|
if self.test_step != "PROCESSING_COMPLETE":
|
|
logger.error(f"Processing did not complete as expected. Current step: {self.test_step}")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
logger.info("Processing completed.") # KEEP INFO - Passes filter
|
|
|
|
# Step 7: Check Output Path
|
|
self.test_step = "CHECK_OUTPUT"
|
|
logger.info(f"Step 7: Checking output path: {self.cli_args.outputdir}") # KEEP INFO - Passes filter
|
|
output_path = Path(self.cli_args.outputdir)
|
|
if not output_path.exists() or not output_path.is_dir():
|
|
logger.error(f"Output directory {output_path} does not exist or is not a directory.")
|
|
self.cleanup_and_exit(success=False)
|
|
return
|
|
|
|
output_items = list(output_path.iterdir())
|
|
if not output_items:
|
|
logger.warning(f"Output directory {output_path} is empty. This might be a test failure depending on the case.")
|
|
# For a more specific check, one might iterate through actual_rules_obj
|
|
# and verify if subdirectories matching asset_name exist.
|
|
# e.g. for asset_rule in source_rule.assets:
|
|
# expected_asset_dir = output_path / asset_rule.asset_name
|
|
# if not expected_asset_dir.is_dir(): logger.error(...)
|
|
else:
|
|
logger.debug(f"Found {len(output_items)} item(s) in output directory:")
|
|
for item in output_items:
|
|
logger.debug(f" - {item.name} ({'dir' if item.is_dir() else 'file'})")
|
|
logger.info("Output path check completed.") # KEEP INFO - Passes filter
|
|
|
|
# Step 8: Retrieve & Analyze Logs
|
|
self.test_step = "CHECK_LOGS"
|
|
logger.debug("Step 8: Retrieving and Analyzing Logs...")
|
|
all_logs_text = ""
|
|
if self.main_window.log_console and self.main_window.log_console.log_console_output:
|
|
all_logs_text = self.main_window.log_console.log_console_output.toPlainText()
|
|
else:
|
|
logger.warning("Log console or output widget not found. Cannot retrieve logs.")
|
|
|
|
self._process_and_display_logs(all_logs_text)
|
|
logger.info("Log analysis completed.")
|
|
|
|
# Final Step
|
|
logger.info("Test run completed successfully.") # KEEP INFO - Passes filter
|
|
self.cleanup_and_exit(success=True)
|
|
|
|
@Slot()
|
|
def _check_prediction_status(self) -> None:
|
|
"""Polls the main window for pending predictions."""
|
|
# logger.debug(f"Checking prediction status. Pending: {self.main_window._pending_predictions if hasattr(self.main_window, '_pending_predictions') else 'N/A'}")
|
|
if hasattr(self.main_window, '_pending_predictions'):
|
|
if not self.main_window._pending_predictions: # Assuming _pending_predictions is a list/dict that's empty when done
|
|
logger.debug("No pending predictions. Prediction assumed complete.")
|
|
self.test_step = "PREDICTION_COMPLETE"
|
|
if self.event_loop.isRunning():
|
|
self.event_loop.quit()
|
|
# else:
|
|
# logger.debug(f"Still awaiting predictions: {len(self.main_window._pending_predictions)} remaining.")
|
|
else:
|
|
logger.warning("'_pending_predictions' attribute not found on MainWindow. Cannot check prediction status automatically.")
|
|
# As a fallback, if the attribute is missing, we might assume prediction is instant or needs manual check.
|
|
# For now, let's assume it means it's done if the attribute is missing, but this is risky.
|
|
# A better approach would be to have a clear signal from MainWindow when predictions are done.
|
|
self.test_step = "PREDICTION_COMPLETE" # Risky assumption
|
|
if self.event_loop.isRunning():
|
|
self.event_loop.quit()
|
|
|
|
|
|
@Slot(int, int, int)
|
|
def _on_all_tasks_finished(self, processed_count: int, skipped_count: int, failed_count: int) -> None:
|
|
"""Slot for App.all_tasks_finished signal."""
|
|
logger.info(f"Signal 'all_tasks_finished' received: Processed={processed_count}, Skipped={skipped_count}, Failed={failed_count}") # KEEP INFO - Passes filter
|
|
|
|
if self.test_step == "AWAIT_PROCESSING":
|
|
logger.debug("Processing completion signal received.") # Covered by the summary log above
|
|
if failed_count > 0:
|
|
logger.error(f"Processing finished with {failed_count} failed task(s).")
|
|
# Even if tasks failed, the test might pass based on output checks.
|
|
# The error is logged for information.
|
|
self.test_step = "PROCESSING_COMPLETE"
|
|
if self.event_loop.isRunning():
|
|
self.event_loop.quit()
|
|
else:
|
|
logger.warning(f"Signal 'all_tasks_finished' received at an unexpected test step: '{self.test_step}'. Counts: P={processed_count}, S={skipped_count}, F={failed_count}")
|
|
|
|
|
|
def _convert_rules_to_comparable(self, source_rules_list: List[SourceRule]) -> Dict[str, Any]:
|
|
"""
|
|
Converts a list of SourceRule objects to a dictionary structure
|
|
suitable for comparison with the expected_rules.json.
|
|
"""
|
|
logger.debug(f"Converting {len(source_rules_list)} SourceRule objects to comparable dictionary...")
|
|
comparable_sources_list = []
|
|
for source_rule_obj in source_rules_list:
|
|
comparable_asset_list = []
|
|
# source_rule_obj.assets is List[AssetRule]
|
|
for asset_rule_obj in source_rule_obj.assets:
|
|
comparable_file_list = []
|
|
# asset_rule_obj.files is List[FileRule]
|
|
for file_rule_obj in asset_rule_obj.files:
|
|
comparable_file_list.append({
|
|
"file_path": file_rule_obj.file_path,
|
|
"item_type": file_rule_obj.item_type,
|
|
"target_asset_name_override": file_rule_obj.target_asset_name_override
|
|
})
|
|
comparable_asset_list.append({
|
|
"asset_name": asset_rule_obj.asset_name,
|
|
"asset_type": asset_rule_obj.asset_type,
|
|
"files": comparable_file_list
|
|
})
|
|
comparable_sources_list.append({
|
|
"input_path": Path(source_rule_obj.input_path).name, # Use only the filename
|
|
"supplier_identifier": source_rule_obj.supplier_identifier,
|
|
"preset_name": source_rule_obj.preset_name,
|
|
"assets": comparable_asset_list
|
|
})
|
|
logger.debug("Conversion to comparable dictionary finished.")
|
|
return {"source_rules": comparable_sources_list}
|
|
|
|
def _compare_rule_item(self, actual_item: Dict[str, Any], expected_item: Dict[str, Any], item_type_name: str, parent_context: str = "") -> bool:
|
|
"""
|
|
Recursively compares an individual actual rule item dictionary with an expected rule item dictionary.
|
|
Logs differences and returns True if they match, False otherwise.
|
|
"""
|
|
item_match = True
|
|
|
|
identifier = ""
|
|
if item_type_name == "SourceRule":
|
|
identifier = expected_item.get('input_path', f'UnknownSource_at_{parent_context}')
|
|
elif item_type_name == "AssetRule":
|
|
identifier = expected_item.get('asset_name', f'UnknownAsset_at_{parent_context}')
|
|
elif item_type_name == "FileRule":
|
|
identifier = expected_item.get('file_path', f'UnknownFile_at_{parent_context}')
|
|
|
|
current_context = f"{parent_context}/{identifier}" if parent_context else identifier
|
|
|
|
# Log Extra Fields: Iterate through keys in actual_item.
|
|
# If a key is in actual_item but not in expected_item (and is not a list container like "assets" or "files"),
|
|
# log this as an informational message.
|
|
for key in actual_item.keys():
|
|
if key not in expected_item and key not in ["assets", "files"]:
|
|
logger.debug(f"Field '{key}' present in actual {item_type_name} ({current_context}) but not specified in expected. Value: '{actual_item[key]}'")
|
|
|
|
# Check Expected Fields: Iterate through keys in expected_item.
|
|
for key, expected_value in expected_item.items():
|
|
if key not in actual_item:
|
|
logger.error(f"Missing expected field '{key}' in actual {item_type_name} ({current_context}).")
|
|
item_match = False
|
|
continue # Continue to check other fields in the expected_item
|
|
|
|
actual_value = actual_item[key]
|
|
|
|
if key == "assets": # List of AssetRule dictionaries
|
|
if not self._compare_list_of_rules(actual_value, expected_value, "AssetRule", current_context, "asset_name"):
|
|
item_match = False
|
|
elif key == "files": # List of FileRule dictionaries
|
|
if not self._compare_list_of_rules(actual_value, expected_value, "FileRule", current_context, "file_path"):
|
|
item_match = False
|
|
else: # Regular field comparison
|
|
if actual_value != expected_value:
|
|
# Handle None vs "None" string for preset_name specifically if it's a common issue
|
|
if key == "preset_name" and actual_value is None and expected_value == "None":
|
|
logger.debug(f"Field '{key}' in {item_type_name} ({current_context}): Actual is None, Expected is string \"None\". Treating as match for now.")
|
|
elif key == "target_asset_name_override" and actual_value is not None and expected_value is None:
|
|
# If actual has a value (e.g. parent asset name) and expected is null/None,
|
|
# this is a mismatch according to strict comparison.
|
|
# For a more lenient check, this logic could be adjusted here.
|
|
# Current strict comparison will flag this as error, which is what the logs show.
|
|
logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.")
|
|
item_match = False
|
|
else:
|
|
logger.error(f"Value mismatch for field '{key}' in {item_type_name} ({current_context}): Actual='{actual_value}', Expected='{expected_value}'.")
|
|
item_match = False
|
|
|
|
return item_match
|
|
|
|
def _compare_list_of_rules(self, actual_list: List[Dict[str, Any]], expected_list: List[Dict[str, Any]], item_type_name: str, parent_context: str, item_key_field: str) -> bool:
|
|
"""
|
|
Compares a list of actual rule items against a list of expected rule items.
|
|
Items are matched by a key field (e.g., 'asset_name' or 'file_path').
|
|
Order independent for matching, but logs count mismatches.
|
|
"""
|
|
list_match = True # Corrected indentation
|
|
if not isinstance(actual_list, list) or not isinstance(expected_list, list):
|
|
logger.error(f"Type mismatch for list of {item_type_name}s in {parent_context}. Expected lists.")
|
|
return False
|
|
|
|
if len(actual_list) != len(expected_list):
|
|
logger.error(f"Mismatch in number of {item_type_name}s for {parent_context}. Actual: {len(actual_list)}, Expected: {len(expected_list)}.")
|
|
list_match = False # Count mismatch is an error
|
|
# If counts differ, we still try to match what we can to provide more detailed feedback,
|
|
# but the overall list_match will remain False.
|
|
|
|
actual_items_map = {item.get(item_key_field): item for item in actual_list if item.get(item_key_field) is not None}
|
|
|
|
# Keep track of expected items that found a match to identify missing ones more easily
|
|
matched_expected_keys = set()
|
|
|
|
for expected_item in expected_list:
|
|
expected_key_value = expected_item.get(item_key_field)
|
|
if expected_key_value is None:
|
|
logger.error(f"Expected {item_type_name} in {parent_context} is missing key field '{item_key_field}'. Cannot compare this item: {expected_item}")
|
|
list_match = False # This specific expected item cannot be processed
|
|
continue
|
|
|
|
actual_item = actual_items_map.get(expected_key_value)
|
|
if actual_item:
|
|
matched_expected_keys.add(expected_key_value)
|
|
if not self._compare_rule_item(actual_item, expected_item, item_type_name, parent_context):
|
|
list_match = False # Individual item comparison failed
|
|
else:
|
|
logger.error(f"Expected {item_type_name} with {item_key_field} '{expected_key_value}' not found in actual items for {parent_context}.")
|
|
list_match = False
|
|
|
|
# Identify actual items that were not matched by any expected item
|
|
# This is useful if len(actual_list) >= len(expected_list) but some actual items are "extra"
|
|
for actual_key_value, actual_item_data in actual_items_map.items():
|
|
if actual_key_value not in matched_expected_keys:
|
|
logger.debug(f"Extra actual {item_type_name} with {item_key_field} '{actual_key_value}' found in {parent_context} (not in expected list or already matched).")
|
|
if len(actual_list) != len(expected_list): # If counts already flagged a mismatch, this is just detail
|
|
pass
|
|
else: # Counts matched, but content didn't align perfectly by key
|
|
list_match = False
|
|
|
|
|
|
return list_match # Corrected indentation
|
|
|
|
def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool: # Corrected structure: moved out
|
|
item_match = False
|
|
|
|
return item_match
|
|
|
|
def _compare_rules(self, actual_rules_data: Dict[str, Any], expected_rules_data: Dict[str, Any]) -> bool:
|
|
"""
|
|
Compares the actual rule data (converted from live SourceRule objects)
|
|
with the expected rule data (loaded from JSON).
|
|
"""
|
|
logger.debug("Comparing actual rules with expected rules...")
|
|
|
|
actual_source_rules = actual_rules_data.get("source_rules", []) if actual_rules_data else []
|
|
expected_source_rules = expected_rules_data.get("source_rules", []) if expected_rules_data else []
|
|
|
|
if not isinstance(actual_source_rules, list):
|
|
logger.error(f"Actual 'source_rules' is not a list. Found type: {type(actual_source_rules)}. Comparison aborted.")
|
|
return False # Cannot compare if actual data is malformed
|
|
if not isinstance(expected_source_rules, list):
|
|
logger.error(f"Expected 'source_rules' is not a list. Found type: {type(expected_source_rules)}. Test configuration error. Comparison aborted.")
|
|
return False # Test setup error
|
|
|
|
if not expected_source_rules and not actual_source_rules:
|
|
logger.debug("Both expected and actual source rules lists are empty. Considered a match.")
|
|
return True
|
|
|
|
if len(actual_source_rules) != len(expected_source_rules):
|
|
logger.error(f"Mismatch in the number of source rules. Actual: {len(actual_source_rules)}, Expected: {len(expected_source_rules)}.")
|
|
# Optionally, log more details about which list is longer/shorter or identifiers if available
|
|
return False
|
|
|
|
overall_match_status = True
|
|
for i in range(len(expected_source_rules)):
|
|
actual_sr = actual_source_rules[i]
|
|
expected_sr = expected_source_rules[i]
|
|
|
|
# For context, use input_path or an index
|
|
source_rule_context = expected_sr.get('input_path', f"SourceRule_index_{i}")
|
|
|
|
if not self._compare_rule_item(actual_sr, expected_sr, "SourceRule", parent_context=source_rule_context):
|
|
overall_match_status = False
|
|
# Continue checking other source rules to log all discrepancies
|
|
|
|
if overall_match_status:
|
|
logger.debug("All rules match the expected criteria.") # Covered by "Rule comparison successful" summary
|
|
else:
|
|
logger.warning("One or more rules did not match the expected criteria. See logs above for details.")
|
|
|
|
return overall_match_status
|
|
|
|
def _process_and_display_logs(self, logs_text: str) -> None: # logs_text is no longer the primary source for search
|
|
"""
|
|
Processes and displays logs, potentially filtering them if --search is used.
|
|
Also checks for tracebacks.
|
|
Sources logs from the in-memory handler for search and detailed analysis.
|
|
"""
|
|
logger.debug("--- Log Analysis ---")
|
|
global autotest_memory_handler # Access the global handler
|
|
log_records = []
|
|
if autotest_memory_handler and autotest_memory_handler.buffer:
|
|
log_records = autotest_memory_handler.buffer
|
|
|
|
formatted_log_lines = []
|
|
# Define a consistent formatter, similar to what might be expected or useful for search
|
|
record_formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
|
|
# Default asctime format includes milliseconds.
|
|
|
|
|
|
for record in log_records:
|
|
formatted_log_lines.append(record_formatter.format(record))
|
|
|
|
lines_for_search_and_traceback = formatted_log_lines
|
|
|
|
if not lines_for_search_and_traceback:
|
|
logger.warning("No log records found in memory handler. No analysis to perform.")
|
|
# Still check the console logs_text for tracebacks if it exists, as a fallback
|
|
# or if some critical errors didn't make it to the memory handler (unlikely with DEBUG level)
|
|
if logs_text:
|
|
logger.debug("Checking provided logs_text (from console) for tracebacks as a fallback.")
|
|
console_lines = logs_text.splitlines()
|
|
traceback_found_console = False
|
|
for i, line in enumerate(console_lines):
|
|
if line.strip().startswith("Traceback (most recent call last):"):
|
|
logger.error(f"!!! TRACEBACK DETECTED in console logs_text around line {i+1} !!!")
|
|
traceback_found_console = True
|
|
if traceback_found_console:
|
|
logger.warning("A traceback was found in the console logs_text.")
|
|
else:
|
|
logger.info("No tracebacks found in the console logs_text either.")
|
|
logger.info("--- End Log Analysis ---")
|
|
return
|
|
|
|
traceback_found = False
|
|
|
|
if self.cli_args.search:
|
|
logger.info(f"Searching {len(lines_for_search_and_traceback)} in-memory log lines for term '{self.cli_args.search}' with {self.cli_args.additional_lines} context lines.")
|
|
matched_line_indices = [i for i, line in enumerate(lines_for_search_and_traceback) if self.cli_args.search in line]
|
|
|
|
if not matched_line_indices:
|
|
logger.info(f"Search term '{self.cli_args.search}' not found in in-memory logs.")
|
|
else:
|
|
logger.info(f"Found {len(matched_line_indices)} match(es) for '{self.cli_args.search}' in in-memory logs:")
|
|
collected_lines_to_print = set()
|
|
for match_idx in matched_line_indices:
|
|
start_idx = max(0, match_idx - self.cli_args.additional_lines)
|
|
end_idx = min(len(lines_for_search_and_traceback), match_idx + self.cli_args.additional_lines + 1)
|
|
for i in range(start_idx, end_idx):
|
|
# Use i directly as index for lines_for_search_and_traceback, line number is for display
|
|
collected_lines_to_print.add(f"L{i+1:05d}: {lines_for_search_and_traceback[i]}")
|
|
|
|
print("--- Filtered Log Output (from Memory Handler) ---")
|
|
for line_to_print in sorted(list(collected_lines_to_print)):
|
|
print(line_to_print)
|
|
print("--- End Filtered Log Output ---")
|
|
# Removed: else block that showed last N lines by default (as per original instruction for this section)
|
|
|
|
# Traceback Check (on lines_for_search_and_traceback)
|
|
for i, line in enumerate(lines_for_search_and_traceback):
|
|
if line.strip().startswith("Traceback (most recent call last):") or "Traceback (most recent call last):" in line : # More robust check
|
|
logger.error(f"!!! TRACEBACK DETECTED in in-memory logs around line index {i} !!!")
|
|
logger.error(f"Line content: {line}")
|
|
traceback_found = True
|
|
|
|
if traceback_found:
|
|
logger.warning("A traceback was found in the in-memory logs. This usually indicates a significant issue.")
|
|
else:
|
|
logger.info("No tracebacks found in the in-memory logs.") # This refers to the comprehensive memory logs
|
|
|
|
logger.info("--- End Log Analysis ---")
|
|
|
|
def cleanup_and_exit(self, success: bool = True) -> None:
|
|
"""Cleans up and exits the application."""
|
|
global autotest_memory_handler
|
|
if autotest_memory_handler:
|
|
logger.debug("Clearing memory log handler buffer and removing handler.")
|
|
autotest_memory_handler.buffer = [] # Clear buffer
|
|
logging.getLogger().removeHandler(autotest_memory_handler) # Remove handler
|
|
autotest_memory_handler.close() # MemoryHandler close is a no-op but good practice
|
|
autotest_memory_handler = None
|
|
|
|
logger.info(f"Test {'succeeded' if success else 'failed'}. Cleaning up and exiting...") # KEEP INFO - Passes filter
|
|
q_app = QCoreApplication.instance()
|
|
if q_app:
|
|
q_app.quit()
|
|
sys.exit(0 if success else 1)
|
|
|
|
# --- Main Execution ---
|
|
def main():
|
|
"""Main function to run the autotest script."""
|
|
cli_args = parse_arguments()
|
|
# Logger is configured above, this will now use the new filtered setup
|
|
logger.info(f"Parsed CLI arguments: {cli_args}") # KEEP INFO - Passes filter
|
|
|
|
# Clean and ensure output directory exists
|
|
output_dir_path = Path(cli_args.outputdir)
|
|
logger.debug(f"Preparing output directory: {output_dir_path}")
|
|
try:
|
|
if output_dir_path.exists():
|
|
logger.debug(f"Output directory {output_dir_path} exists. Cleaning its contents...")
|
|
for item in output_dir_path.iterdir():
|
|
if item.is_dir():
|
|
shutil.rmtree(item)
|
|
logger.debug(f"Removed directory: {item}")
|
|
else:
|
|
item.unlink()
|
|
logger.debug(f"Removed file: {item}")
|
|
logger.debug(f"Contents of {output_dir_path} cleaned.")
|
|
else:
|
|
logger.debug(f"Output directory {output_dir_path} does not exist. Creating it.")
|
|
|
|
output_dir_path.mkdir(parents=True, exist_ok=True) # Ensure it exists after cleaning/if it didn't exist
|
|
logger.debug(f"Output directory {output_dir_path} is ready.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Could not prepare output directory {output_dir_path}: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
|
|
# Initialize QApplication
|
|
# Use QCoreApplication if no GUI elements are directly interacted with by the test logic itself,
|
|
# but QApplication is needed if MainWindow or its widgets are constructed and used.
|
|
# Since MainWindow is instantiated by App, QApplication is appropriate.
|
|
q_app = QApplication.instance()
|
|
if not q_app:
|
|
q_app = QApplication(sys.argv)
|
|
if not q_app: # Still no app
|
|
logger.error("Failed to initialize QApplication.")
|
|
sys.exit(1)
|
|
|
|
logger.debug("Initializing main.App()...")
|
|
try:
|
|
# Instantiate main.App() - this should create MainWindow but not show it by default
|
|
# if App is designed to not show GUI unless app.main_window.show() is called.
|
|
app_instance = App()
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize main.App: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
|
|
if not app_instance.main_window:
|
|
logger.error("main.App initialized, but main_window is None. Cannot proceed with test.")
|
|
sys.exit(1)
|
|
|
|
logger.debug("Initializing AutoTester...")
|
|
try:
|
|
tester = AutoTester(app_instance, cli_args)
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize AutoTester: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
|
|
# Use QTimer.singleShot to start the test after the Qt event loop has started.
|
|
# This ensures that the Qt environment is fully set up.
|
|
logger.debug("Scheduling test run...")
|
|
QTimer.singleShot(0, tester.run_test)
|
|
|
|
logger.debug("Starting Qt application event loop...")
|
|
exit_code = q_app.exec()
|
|
logger.debug(f"Qt application event loop finished with exit code: {exit_code}")
|
|
sys.exit(exit_code)
|
|
|
|
if __name__ == "__main__":
|
|
main() |