383 lines
18 KiB
Python

import pytest
from unittest import mock
from pathlib import Path
import uuid
import shutil # For checking rmtree
import tempfile # For mocking mkdtemp
from processing.pipeline.orchestrator import PipelineOrchestrator
from processing.pipeline.asset_context import AssetProcessingContext
from processing.pipeline.stages.base_stage import ProcessingStage # For mocking stages
from rule_structure import SourceRule, AssetRule, FileRule
from configuration import Configuration, GeneralSettings
# Mock Stage that modifies context
class MockPassThroughStage(ProcessingStage):
def __init__(self, stage_name="mock_stage"):
self.stage_name = stage_name
self.execute_call_count = 0
self.contexts_called_with = [] # To store contexts for verification
def execute(self, context: AssetProcessingContext) -> AssetProcessingContext:
self.execute_call_count += 1
self.contexts_called_with.append(context)
# Optionally, modify context for testing
context.asset_metadata[f'{self.stage_name}_executed'] = True
if self.stage_name == "skipper_stage": # Example conditional logic
context.status_flags['skip_asset'] = True
context.status_flags['skip_reason'] = "Skipped by skipper_stage"
elif self.stage_name == "error_stage": # Example error-raising stage
raise ValueError("Simulated error in error_stage")
# Simulate status update based on stage execution
if not context.status_flags.get('skip_asset') and not context.status_flags.get('asset_failed'):
context.asset_metadata['status'] = "Processed" # Default to processed if not skipped/failed
return context
def create_orchestrator_test_config() -> mock.MagicMock:
mock_config = mock.MagicMock(spec=Configuration)
mock_config.general_settings = mock.MagicMock(spec=GeneralSettings)
mock_config.general_settings.temp_dir_override = None # Default, can be overridden in tests
# Add other config details if orchestrator or stages depend on them directly
return mock_config
def create_orchestrator_test_asset_rule(name: str, num_file_rules: int = 1) -> mock.MagicMock:
asset_rule = mock.MagicMock(spec=AssetRule)
asset_rule.name = name
asset_rule.id = uuid.uuid4()
asset_rule.source_path = Path(f"/fake/source/{name}") # Using Path object
asset_rule.file_rules = [mock.MagicMock(spec=FileRule) for _ in range(num_file_rules)]
asset_rule.enabled = True
asset_rule.map_types = {} # Initialize as dict
asset_rule.material_name_scheme = "{asset_name}"
asset_rule.texture_name_scheme = "{asset_name}_{map_type}"
asset_rule.output_path_scheme = "{source_name}/{asset_name}"
# ... other necessary AssetRule fields ...
return asset_rule
def create_orchestrator_test_source_rule(name: str, num_assets: int = 1, asset_names: list = None) -> mock.MagicMock:
source_rule = mock.MagicMock(spec=SourceRule)
source_rule.name = name
source_rule.id = uuid.uuid4()
if asset_names:
source_rule.assets = [create_orchestrator_test_asset_rule(an) for an in asset_names]
else:
source_rule.assets = [create_orchestrator_test_asset_rule(f"Asset_{i+1}_in_{name}") for i in range(num_assets)]
source_rule.enabled = True
source_rule.source_path = Path(f"/fake/source_root/{name}") # Using Path object
# ... other necessary SourceRule fields ...
return source_rule
# --- Test Cases for PipelineOrchestrator.process_source_rule() ---
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp')
def test_orchestrator_basic_flow_mock_stages(mock_mkdtemp, mock_rmtree):
mock_mkdtemp.return_value = "/fake/engine_temp_dir_path" # Path for mkdtemp
config = create_orchestrator_test_config()
stage1 = MockPassThroughStage("stage1")
stage2 = MockPassThroughStage("stage2")
orchestrator = PipelineOrchestrator(config_obj=config, stages=[stage1, stage2])
source_rule = create_orchestrator_test_source_rule("MySourceRule", num_assets=2)
asset1_name = source_rule.assets[0].name
asset2_name = source_rule.assets[1].name
# Mock asset_metadata to be updated by stages for status check
# The MockPassThroughStage already sets a 'status' = "Processed" if not skipped/failed
# and adds '{stage_name}_executed' = True to asset_metadata.
results = orchestrator.process_source_rule(
source_rule, Path("/ws"), Path("/out"), False, "inc_val_123", "sha_val_abc"
)
assert stage1.execute_call_count == 2 # Called for each asset
assert stage2.execute_call_count == 2 # Called for each asset
assert asset1_name in results['processed']
assert asset2_name in results['processed']
assert not results['skipped']
assert not results['failed']
# Verify context modifications by stages
for i in range(2): # For each asset
# Stage 1 context checks
s1_context_asset = stage1.contexts_called_with[i]
assert s1_context_asset.asset_metadata.get('stage1_executed') is True
assert s1_context_asset.asset_metadata.get('stage2_executed') is None # Stage 2 not yet run for this asset
# Stage 2 context checks
s2_context_asset = stage2.contexts_called_with[i]
assert s2_context_asset.asset_metadata.get('stage1_executed') is True # From stage 1
assert s2_context_asset.asset_metadata.get('stage2_executed') is True
assert s2_context_asset.asset_metadata.get('status') == "Processed"
mock_mkdtemp.assert_called_once()
# The orchestrator creates a subdirectory within the mkdtemp path
expected_temp_path = Path(mock_mkdtemp.return_value) / source_rule.id.hex
mock_rmtree.assert_called_once_with(expected_temp_path, ignore_errors=True)
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp')
def test_orchestrator_asset_skipping_by_stage(mock_mkdtemp, mock_rmtree):
mock_mkdtemp.return_value = "/fake/engine_temp_dir_path_skip"
config = create_orchestrator_test_config()
skipper_stage = MockPassThroughStage("skipper_stage") # This stage will set skip_asset = True
stage_after_skip = MockPassThroughStage("stage_after_skip")
orchestrator = PipelineOrchestrator(config_obj=config, stages=[skipper_stage, stage_after_skip])
source_rule = create_orchestrator_test_source_rule("SkipSourceRule", num_assets=1)
asset_to_skip_name = source_rule.assets[0].name
results = orchestrator.process_source_rule(
source_rule, Path("/ws_skip"), Path("/out_skip"), False, "inc_skip", "sha_skip"
)
assert skipper_stage.execute_call_count == 1 # Called for the asset
assert stage_after_skip.execute_call_count == 0 # Not called because asset was skipped
assert asset_to_skip_name in results['skipped']
assert not results['processed']
assert not results['failed']
# Verify skip reason in context if needed (MockPassThroughStage stores contexts)
skipped_context = skipper_stage.contexts_called_with[0]
assert skipped_context.status_flags['skip_asset'] is True
assert skipped_context.status_flags['skip_reason'] == "Skipped by skipper_stage"
mock_mkdtemp.assert_called_once()
expected_temp_path = Path(mock_mkdtemp.return_value) / source_rule.id.hex
mock_rmtree.assert_called_once_with(expected_temp_path, ignore_errors=True)
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp')
def test_orchestrator_no_assets_in_source_rule(mock_mkdtemp, mock_rmtree):
mock_mkdtemp.return_value = "/fake/engine_temp_dir_no_assets"
config = create_orchestrator_test_config()
stage1 = MockPassThroughStage("stage1_no_assets")
orchestrator = PipelineOrchestrator(config_obj=config, stages=[stage1])
source_rule = create_orchestrator_test_source_rule("NoAssetSourceRule", num_assets=0)
results = orchestrator.process_source_rule(
source_rule, Path("/ws_no_assets"), Path("/out_no_assets"), False, "inc_no", "sha_no"
)
assert stage1.execute_call_count == 0
assert not results['processed']
assert not results['skipped']
assert not results['failed']
# mkdtemp should still be called for the source rule processing, even if no assets
mock_mkdtemp.assert_called_once()
expected_temp_path = Path(mock_mkdtemp.return_value) / source_rule.id.hex
mock_rmtree.assert_called_once_with(expected_temp_path, ignore_errors=True)
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp')
def test_orchestrator_error_during_stage_execution(mock_mkdtemp, mock_rmtree):
mock_mkdtemp.return_value = "/fake/engine_temp_dir_error"
config = create_orchestrator_test_config()
error_stage = MockPassThroughStage("error_stage") # This stage will raise an error
stage_after_error = MockPassThroughStage("stage_after_error")
orchestrator = PipelineOrchestrator(config_obj=config, stages=[error_stage, stage_after_error])
# Test with two assets, one fails, one processes (if orchestrator continues)
# The current orchestrator's process_asset is per asset, so an error in one
# should not stop processing of other assets in the same source_rule.
source_rule = create_orchestrator_test_source_rule("ErrorSourceRule", asset_names=["AssetFails", "AssetSucceeds"])
asset_fails_name = source_rule.assets[0].name
asset_succeeds_name = source_rule.assets[1].name
# Make only the first asset's processing trigger the error
original_execute = error_stage.execute
def error_execute_side_effect(context: AssetProcessingContext):
if context.asset_rule.name == asset_fails_name:
# The MockPassThroughStage is already configured to raise ValueError for "error_stage"
# but we need to ensure it's only for the first asset.
# We can achieve this by modifying the stage_name temporarily or by checking asset_rule.name
# For simplicity, let's assume the mock stage's error logic is fine,
# and we just need to check the outcome.
# The error_stage will raise ValueError("Simulated error in error_stage")
# The orchestrator's _process_single_asset catches generic Exception.
return original_execute(context) # This will call the erroring logic
else:
# For the second asset, make it pass through without error
context.asset_metadata[f'{error_stage.stage_name}_executed'] = True
context.asset_metadata['status'] = "Processed"
return context
error_stage.execute = mock.MagicMock(side_effect=error_execute_side_effect)
# stage_after_error should still be called for the successful asset
results = orchestrator.process_source_rule(
source_rule, Path("/ws_error"), Path("/out_error"), False, "inc_err", "sha_err"
)
assert error_stage.execute.call_count == 2 # Called for both assets
# stage_after_error is only called for the asset that didn't fail in error_stage
assert stage_after_error.execute_call_count == 1
assert asset_fails_name in results['failed']
assert asset_succeeds_name in results['processed']
assert not results['skipped']
# Verify the context of the failed asset
failed_context = None
for ctx in error_stage.contexts_called_with:
if ctx.asset_rule.name == asset_fails_name:
failed_context = ctx
break
assert failed_context is not None
assert failed_context.status_flags['asset_failed'] is True
assert "Simulated error in error_stage" in failed_context.status_flags['failure_reason']
# Verify the context of the successful asset after stage_after_error
successful_context_after_s2 = None
for ctx in stage_after_error.contexts_called_with:
if ctx.asset_rule.name == asset_succeeds_name:
successful_context_after_s2 = ctx
break
assert successful_context_after_s2 is not None
assert successful_context_after_s2.asset_metadata.get('error_stage_executed') is True # from the non-erroring path
assert successful_context_after_s2.asset_metadata.get('stage_after_error_executed') is True
assert successful_context_after_s2.asset_metadata.get('status') == "Processed"
mock_mkdtemp.assert_called_once()
expected_temp_path = Path(mock_mkdtemp.return_value) / source_rule.id.hex
mock_rmtree.assert_called_once_with(expected_temp_path, ignore_errors=True)
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp')
def test_orchestrator_asset_processing_context_initialization(mock_mkdtemp, mock_rmtree):
mock_engine_temp_dir = "/fake/engine_temp_dir_context_init"
mock_mkdtemp.return_value = mock_engine_temp_dir
config = create_orchestrator_test_config()
mock_stage = MockPassThroughStage("context_check_stage")
orchestrator = PipelineOrchestrator(config_obj=config, stages=[mock_stage])
source_rule = create_orchestrator_test_source_rule("ContextSourceRule", num_assets=1)
asset_rule = source_rule.assets[0]
workspace_path = Path("/ws_context")
output_base_path = Path("/out_context")
incrementing_value = "inc_context_123"
sha5_value = "sha_context_abc"
orchestrator.process_source_rule(
source_rule, workspace_path, output_base_path, False, incrementing_value, sha5_value
)
assert mock_stage.execute_call_count == 1
# Retrieve the context passed to the mock stage
captured_context = mock_stage.contexts_called_with[0]
assert captured_context.source_rule == source_rule
assert captured_context.asset_rule == asset_rule
assert captured_context.workspace_path == workspace_path
# engine_temp_dir for the asset is a sub-directory of the source_rule's temp dir
# which itself is a sub-directory of the main engine_temp_dir from mkdtemp
expected_source_rule_temp_dir = Path(mock_engine_temp_dir) / source_rule.id.hex
expected_asset_temp_dir = expected_source_rule_temp_dir / asset_rule.id.hex
assert captured_context.engine_temp_dir == expected_asset_temp_dir
assert captured_context.output_base_path == output_base_path
assert captured_context.config_obj == config
assert captured_context.incrementing_value == incrementing_value
assert captured_context.sha5_value == sha5_value
# Check initial state of other context fields
assert captured_context.asset_metadata == {} # Should be empty initially for an asset
assert captured_context.status_flags == {} # Should be empty initially
assert captured_context.shared_data == {} # Should be empty initially
assert captured_context.current_files == [] # Should be empty initially
mock_mkdtemp.assert_called_once()
mock_rmtree.assert_called_once_with(expected_source_rule_temp_dir, ignore_errors=True)
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp')
def test_orchestrator_temp_dir_override_from_config(mock_mkdtemp, mock_rmtree):
# This test verifies that if config.general_settings.temp_dir_override is set,
# mkdtemp is NOT called, and the override path is used and cleaned up.
config = create_orchestrator_test_config()
override_temp_path_str = "/override/temp/path"
config.general_settings.temp_dir_override = override_temp_path_str
stage1 = MockPassThroughStage("stage_temp_override")
orchestrator = PipelineOrchestrator(config_obj=config, stages=[stage1])
source_rule = create_orchestrator_test_source_rule("TempOverrideRule", num_assets=1)
asset_rule = source_rule.assets[0]
results = orchestrator.process_source_rule(
source_rule, Path("/ws_override"), Path("/out_override"), False, "inc_override", "sha_override"
)
assert stage1.execute_call_count == 1
assert asset_rule.name in results['processed']
mock_mkdtemp.assert_not_called() # mkdtemp should not be called due to override
# The orchestrator should create its source-rule specific subdir within the override
expected_source_rule_temp_dir_in_override = Path(override_temp_path_str) / source_rule.id.hex
# Verify the context passed to the stage uses the overridden path structure
captured_context = stage1.contexts_called_with[0]
expected_asset_temp_dir_in_override = expected_source_rule_temp_dir_in_override / asset_rule.id.hex
assert captured_context.engine_temp_dir == expected_asset_temp_dir_in_override
# rmtree should be called on the source_rule's directory within the override path
mock_rmtree.assert_called_once_with(expected_source_rule_temp_dir_in_override, ignore_errors=True)
@mock.patch('shutil.rmtree')
@mock.patch('tempfile.mkdtemp')
def test_orchestrator_disabled_asset_rule_is_skipped(mock_mkdtemp, mock_rmtree):
mock_mkdtemp.return_value = "/fake/engine_temp_dir_disabled_asset"
config = create_orchestrator_test_config()
stage1 = MockPassThroughStage("stage_disabled_check")
orchestrator = PipelineOrchestrator(config_obj=config, stages=[stage1])
source_rule = create_orchestrator_test_source_rule("DisabledAssetSourceRule", asset_names=["EnabledAsset", "DisabledAsset"])
enabled_asset = source_rule.assets[0]
disabled_asset = source_rule.assets[1]
disabled_asset.enabled = False # Disable this asset rule
results = orchestrator.process_source_rule(
source_rule, Path("/ws_disabled"), Path("/out_disabled"), False, "inc_dis", "sha_dis"
)
assert stage1.execute_call_count == 1 # Only called for the enabled asset
assert enabled_asset.name in results['processed']
assert disabled_asset.name in results['skipped']
assert not results['failed']
# Verify context for the processed asset
assert stage1.contexts_called_with[0].asset_rule.name == enabled_asset.name
# Verify skip reason for the disabled asset (this is set by the orchestrator itself)
# The orchestrator's _process_single_asset checks asset_rule.enabled
# We need to inspect the results dictionary for the skip reason if it's stored there,
# or infer it. The current structure of `results` doesn't store detailed skip reasons directly,
# but the test ensures it's in the 'skipped' list.
# For a more detailed check, one might need to adjust how results are reported or mock deeper.
# For now, confirming it's in 'skipped' and stage1 wasn't called for it is sufficient.
mock_mkdtemp.assert_called_once()
expected_temp_path = Path(mock_mkdtemp.return_value) / source_rule.id.hex
mock_rmtree.assert_called_once_with(expected_temp_path, ignore_errors=True)