133 lines
7.1 KiB
Python

from typing import List, Dict, Optional
from pathlib import Path
import shutil
import tempfile
import logging
from configuration import Configuration
from rule_structure import SourceRule, AssetRule
from .asset_context import AssetProcessingContext
from .stages.base_stage import ProcessingStage
log = logging.getLogger(__name__)
class PipelineOrchestrator:
"""
Orchestrates the processing of assets based on source rules and a series of processing stages.
"""
def __init__(self, config_obj: Configuration, stages: List[ProcessingStage]):
"""
Initializes the PipelineOrchestrator.
Args:
config_obj: The main configuration object.
stages: A list of processing stages to be executed in order.
"""
self.config_obj: Configuration = config_obj
self.stages: List[ProcessingStage] = stages
def process_source_rule(
self,
source_rule: SourceRule,
workspace_path: Path,
output_base_path: Path,
overwrite: bool, # Not used in this initial implementation, but part of the signature
incrementing_value: Optional[str],
sha5_value: Optional[str] # Corrected from sha5_value to sha256_value as per typical usage, assuming typo
) -> Dict[str, List[str]]:
"""
Processes a single source rule, iterating through its asset rules and applying all stages.
Args:
source_rule: The source rule to process.
workspace_path: The base path of the workspace.
output_base_path: The base path for output files.
overwrite: Whether to overwrite existing files (not fully implemented yet).
incrementing_value: An optional incrementing value for versioning or naming.
sha5_value: An optional SHA5 hash value for the asset (assuming typo, likely sha256).
Returns:
A dictionary summarizing the processing status of assets.
"""
overall_status: Dict[str, List[str]] = {
"processed": [],
"skipped": [],
"failed": [],
}
engine_temp_dir_path: Optional[Path] = None # Initialize to None
try:
# Create a temporary directory for this processing run if needed by any stage
# This temp dir is for the entire source_rule processing, not per asset.
# Individual stages might create their own sub-temp dirs if necessary.
temp_dir_path_str = tempfile.mkdtemp(
prefix="asset_processor_orchestrator_temp_", dir=self.config_obj.get_temp_directory_base()
)
engine_temp_dir_path = Path(temp_dir_path_str)
log.debug(f"PipelineOrchestrator created temporary directory: {engine_temp_dir_path}")
for asset_rule in source_rule.assets:
log.debug(f"Orchestrator: Processing asset '{asset_rule.name}'")
context = AssetProcessingContext(
source_rule=source_rule,
asset_rule=asset_rule,
workspace_path=workspace_path, # This is the path to the source files (e.g. extracted archive)
engine_temp_dir=engine_temp_dir_path, # Pass the orchestrator's temp dir
output_base_path=output_base_path,
effective_supplier=None, # Will be set by SupplierDeterminationStage
asset_metadata={}, # Will be populated by stages
processed_maps_details={}, # Will be populated by stages
merged_maps_details={}, # Will be populated by stages
files_to_process=[], # Will be populated by FileRuleFilterStage
loaded_data_cache={}, # For image loading cache within this asset's processing
config_obj=self.config_obj,
status_flags={"skip_asset": False, "asset_failed": False}, # Initialize common flags
incrementing_value=incrementing_value,
sha256_value=sha5_value # Parameter name in context is sha256_value
)
for stage_idx, stage in enumerate(self.stages):
log.debug(f"Asset '{asset_rule.name}': Executing stage {stage_idx + 1}/{len(self.stages)}: {stage.__class__.__name__}")
try:
context = stage.execute(context)
except Exception as e:
log.error(f"Asset '{asset_rule.name}': Error during stage '{stage.__class__.__name__}': {e}", exc_info=True)
context.status_flags["asset_failed"] = True
context.asset_metadata["status"] = f"Failed: Error in stage {stage.__class__.__name__}"
context.asset_metadata["error_message"] = str(e)
break # Stop processing stages for this asset on error
if context.status_flags.get("skip_asset"):
log.info(f"Asset '{asset_rule.name}': Skipped by stage '{stage.__class__.__name__}'. Reason: {context.status_flags.get('skip_reason', 'N/A')}")
break # Skip remaining stages for this asset
# Refined status collection
if context.status_flags.get('skip_asset'):
overall_status["skipped"].append(asset_rule.name)
elif context.status_flags.get('asset_failed') or str(context.asset_metadata.get('status', '')).startswith("Failed"):
overall_status["failed"].append(asset_rule.name)
elif context.asset_metadata.get('status') == "Processed":
overall_status["processed"].append(asset_rule.name)
else: # Default or unknown state
log.warning(f"Asset '{asset_rule.name}': Unknown status after pipeline execution. Metadata status: '{context.asset_metadata.get('status')}'. Marking as failed.")
overall_status["failed"].append(f"{asset_rule.name} (Unknown Status: {context.asset_metadata.get('status')})")
log.debug(f"Asset '{asset_rule.name}' final status: {context.asset_metadata.get('status', 'N/A')}, Flags: {context.status_flags}")
except Exception as e:
log.error(f"PipelineOrchestrator.process_source_rule failed: {e}", exc_info=True)
# Mark all remaining assets as failed if a top-level error occurs
processed_or_skipped_or_failed = set(overall_status["processed"] + overall_status["skipped"] + overall_status["failed"])
for asset_rule in source_rule.assets:
if asset_rule.name not in processed_or_skipped_or_failed:
overall_status["failed"].append(f"{asset_rule.name} (Orchestrator Error)")
finally:
if engine_temp_dir_path and engine_temp_dir_path.exists():
try:
log.debug(f"PipelineOrchestrator cleaning up temporary directory: {engine_temp_dir_path}")
shutil.rmtree(engine_temp_dir_path, ignore_errors=True)
except Exception as e:
log.error(f"Error cleaning up orchestrator temporary directory {engine_temp_dir_path}: {e}", exc_info=True)
return overall_status