Asset-Frameworker/monitor.py
2025-04-29 18:26:13 +02:00

221 lines
9.8 KiB
Python

# monitor.py
import os
import sys
import time
import logging
import re
import shutil
from pathlib import Path
from watchdog.observers.polling import PollingObserver as Observer # Use polling for better compatibility
from watchdog.events import FileSystemEventHandler, FileCreatedEvent
# --- Import from local modules ---
try:
# Assuming main.py is in the same directory
from main import run_processing, setup_logging, ConfigurationError, AssetProcessingError
except ImportError as e:
print(f"ERROR: Failed to import required functions/classes from main.py: {e}")
print("Ensure main.py is in the same directory as monitor.py.")
sys.exit(1)
# --- Configuration ---
# Read from environment variables with defaults
INPUT_DIR = Path(os.environ.get('INPUT_DIR', '/data/input'))
OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', '/data/output'))
PROCESSED_DIR = Path(os.environ.get('PROCESSED_DIR', '/data/processed'))
ERROR_DIR = Path(os.environ.get('ERROR_DIR', '/data/error'))
LOG_LEVEL_STR = os.environ.get('LOG_LEVEL', 'INFO').upper()
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '5'))
PROCESS_DELAY = int(os.environ.get('PROCESS_DELAY', '2'))
# Default workers for monitor - can be overridden if needed via env var
DEFAULT_WORKERS = max(1, os.cpu_count() // 2 if os.cpu_count() else 1)
NUM_WORKERS = int(os.environ.get('NUM_WORKERS', str(DEFAULT_WORKERS)))
# --- Logging Setup ---
log_level = getattr(logging, LOG_LEVEL_STR, logging.INFO)
# Use the setup_logging from main.py but configure the level directly
# We don't have a 'verbose' flag here, so call basicConfig directly
log_format = '%(asctime)s [%(levelname)-8s] %(name)s: %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(level=log_level, format=log_format, datefmt=date_format, handlers=[logging.StreamHandler(sys.stdout)])
log = logging.getLogger("monitor")
log.info(f"Logging level set to: {logging.getLevelName(log_level)}")
log.info(f"Monitoring Input Directory: {INPUT_DIR}")
log.info(f"Output Directory: {OUTPUT_DIR}")
log.info(f"Processed Files Directory: {PROCESSED_DIR}")
log.info(f"Error Files Directory: {ERROR_DIR}")
log.info(f"Polling Interval: {POLL_INTERVAL}s")
log.info(f"Processing Delay: {PROCESS_DELAY}s")
log.info(f"Max Workers: {NUM_WORKERS}")
# --- Preset Validation ---
PRESET_DIR = Path(__file__).parent / "Presets"
PRESET_FILENAME_REGEX = re.compile(r"^\[?([a-zA-Z0-9_-]+)\]?_.*\.(zip|rar|7z)$", re.IGNORECASE)
def validate_preset(preset_name: str) -> bool:
"""Checks if the preset JSON file exists."""
if not preset_name:
return False
preset_file = PRESET_DIR / f"{preset_name}.json"
exists = preset_file.is_file()
if not exists:
log.warning(f"Preset file not found: {preset_file}")
return exists
# --- Watchdog Event Handler ---
class ZipHandler(FileSystemEventHandler):
"""Handles file system events for new ZIP files."""
def __init__(self, input_dir: Path, output_dir: Path, processed_dir: Path, error_dir: Path):
self.input_dir = input_dir.resolve()
self.output_dir = output_dir.resolve()
self.processed_dir = processed_dir.resolve()
self.error_dir = error_dir.resolve()
# Ensure target directories exist
self.output_dir.mkdir(parents=True, exist_ok=True)
self.processed_dir.mkdir(parents=True, exist_ok=True)
self.error_dir.mkdir(parents=True, exist_ok=True)
log.info("Handler initialized, target directories ensured.")
def on_created(self, event: FileCreatedEvent):
"""Called when a file or directory is created."""
if event.is_directory:
return
src_path = Path(event.src_path)
log.debug(f"File creation event detected: {src_path}")
# Check if the file has a supported archive extension
supported_suffixes = ['.zip', '.rar', '.7z']
if src_path.suffix.lower() not in supported_suffixes:
log.debug(f"Ignoring file with unsupported extension: {src_path.name}")
return
log.info(f"Detected new ZIP file: {src_path.name}. Waiting {PROCESS_DELAY}s before processing...")
time.sleep(PROCESS_DELAY)
# Re-check if file still exists (might have been temporary)
if not src_path.exists():
log.warning(f"File disappeared after delay: {src_path.name}")
return
log.info(f"Processing file: {src_path.name}")
# --- Extract Preset Name ---
match = PRESET_FILENAME_REGEX.match(src_path.name)
if not match:
log.warning(f"Filename '{src_path.name}' does not match expected format '[preset]_filename.zip'. Ignoring.")
# Optionally move to an 'ignored' or 'error' directory? For now, leave it.
return
preset_name = match.group(1)
log.info(f"Extracted preset name: '{preset_name}' from {src_path.name}")
# --- Validate Preset ---
if not validate_preset(preset_name):
log.error(f"Preset '{preset_name}' is not valid (missing {PRESET_DIR / f'{preset_name}.json'}). Ignoring file {src_path.name}.")
# Move to error dir if preset is invalid? Let's do that.
self.move_file(src_path, self.error_dir, "invalid_preset")
return
# --- Run Processing ---
try:
log.info(f"Starting asset processing for '{src_path.name}' using preset '{preset_name}'...")
# run_processing expects a list of inputs
results = run_processing(
valid_inputs=[str(src_path)],
preset_name=preset_name,
output_dir_for_processor=str(self.output_dir), # Pass absolute output path
overwrite=False, # Default to no overwrite for monitored files? Or make configurable? Let's default to False.
num_workers=NUM_WORKERS
)
# --- Handle Results ---
# Check overall status based on counts
processed = results.get("processed", 0)
skipped = results.get("skipped", 0)
failed = results.get("failed", 0)
pool_error = results.get("pool_error")
if pool_error:
log.error(f"Processing pool error for {src_path.name}: {pool_error}")
self.move_file(src_path, self.error_dir, "pool_error")
elif failed > 0:
log.error(f"Processing failed for {src_path.name}. Check worker logs for details.")
# Log specific errors if available in results_list
for res_path, status, err_msg in results.get("results_list", []):
if status == "failed":
log.error(f" - Failure reason: {err_msg}")
self.move_file(src_path, self.error_dir, "processing_failed")
elif processed > 0:
log.info(f"Successfully processed {src_path.name}.")
self.move_file(src_path, self.processed_dir, "processed")
elif skipped > 0:
log.info(f"Processing skipped for {src_path.name} (likely already exists).")
self.move_file(src_path, self.processed_dir, "skipped")
else:
# Should not happen if input was valid zip
log.warning(f"Processing finished for {src_path.name} with unexpected status (0 processed, 0 skipped, 0 failed). Moving to error dir.")
self.move_file(src_path, self.error_dir, "unknown_status")
except (ConfigurationError, AssetProcessingError) as e:
log.error(f"Asset processing error for {src_path.name}: {e}", exc_info=True)
self.move_file(src_path, self.error_dir, "processing_exception")
except Exception as e:
log.exception(f"Unexpected error during processing trigger for {src_path.name}: {e}")
self.move_file(src_path, self.error_dir, "monitor_exception")
def move_file(self, src: Path, dest_dir: Path, reason: str):
"""Safely moves a file, handling potential name collisions."""
if not src.exists():
log.warning(f"Source file {src} does not exist, cannot move for reason: {reason}.")
return
try:
dest_path = dest_dir / src.name
# Handle potential name collision in destination
counter = 1
while dest_path.exists():
dest_path = dest_dir / f"{src.stem}_{counter}{src.suffix}"
counter += 1
if counter > 100: # Safety break
log.error(f"Could not find unique name for {src.name} in {dest_dir} after 100 attempts. Aborting move.")
return
log.info(f"Moving '{src.name}' to '{dest_dir.name}/' directory (Reason: {reason}). Final path: {dest_path.name}")
shutil.move(str(src), str(dest_path))
except Exception as e:
log.exception(f"Failed to move file {src.name} to {dest_dir}: {e}")
# --- Main Monitor Loop ---
if __name__ == "__main__":
# Ensure input directory exists
if not INPUT_DIR.is_dir():
log.error(f"Input directory does not exist or is not a directory: {INPUT_DIR}")
log.error("Please create the directory or mount a volume correctly.")
sys.exit(1)
event_handler = ZipHandler(INPUT_DIR, OUTPUT_DIR, PROCESSED_DIR, ERROR_DIR)
observer = Observer()
observer.schedule(event_handler, str(INPUT_DIR), recursive=False) # Don't watch subdirectories
log.info("Starting file system monitor...")
observer.start()
log.info("Monitor started. Press Ctrl+C to stop.")
try:
while True:
# Keep the main thread alive, observer runs in background thread
time.sleep(1)
except KeyboardInterrupt:
log.info("Keyboard interrupt received, stopping monitor...")
observer.stop()
except Exception as e:
log.exception(f"An unexpected error occurred in the main loop: {e}")
observer.stop()
observer.join()
log.info("Monitor stopped.")