From 528d9be47f6315c39bd90c5c2596469ca41854f2 Mon Sep 17 00:00:00 2001 From: Rusfort Date: Mon, 12 May 2025 23:03:26 +0200 Subject: [PATCH] Closer to feature parity - missing merge still --- processing/pipeline/asset_context.py | 1 + processing/pipeline/orchestrator.py | 27 ++++++++- .../pipeline/stages/output_organization.py | 57 ++----------------- .../stages/prepare_processing_items.py | 8 ++- processing/pipeline/stages/save_variants.py | 1 + processing/utils/image_saving_utils.py | 22 +++++-- 6 files changed, 54 insertions(+), 62 deletions(-) diff --git a/processing/pipeline/asset_context.py b/processing/pipeline/asset_context.py index b195927..f6363e5 100644 --- a/processing/pipeline/asset_context.py +++ b/processing/pipeline/asset_context.py @@ -72,6 +72,7 @@ class SaveVariantsInput: png_compression_level: int jpg_quality: int output_filename_pattern: str + resolution_threshold_for_jpg: Optional[int] # Added for JPG conversion # Output for SaveVariantsStage @dataclass diff --git a/processing/pipeline/orchestrator.py b/processing/pipeline/orchestrator.py index 6765506..5cdd88b 100644 --- a/processing/pipeline/orchestrator.py +++ b/processing/pipeline/orchestrator.py @@ -200,13 +200,27 @@ class PipelineOrchestrator: try: # 1. Process (Load/Merge + Transform) if isinstance(item, FileRule): + if item.item_type == 'EXTRA': + log.debug(f"{item_log_prefix}: Skipping image processing for EXTRA FileRule '{item.file_path}'.") + # Add a basic entry to processed_maps_details to acknowledge it was seen + context.processed_maps_details[item.file_path] = { + "status": "Skipped (EXTRA file)", + "internal_map_type": "EXTRA", + "source_file": str(item.file_path) + } + continue # Skip to the next item item_key = item.file_path # Use file_path string as key log.debug(f"{item_log_prefix}: Processing FileRule '{item.file_path}'...") processed_data = self._regular_processor_stage.execute(context, item) elif isinstance(item, MergeTaskDefinition): item_key = item.task_key # Use task_key string as key - log.debug(f"{item_log_prefix}: Processing MergeTask '{item_key}'...") + log.info(f"{item_log_prefix}: Executing MergedTaskProcessorStage for MergeTask '{item_key}'...") # Log call processed_data = self._merged_processor_stage.execute(context, item) + # Log status/error from merge processor + if processed_data: + log.info(f"{item_log_prefix}: MergedTaskProcessorStage result - Status: {processed_data.status}, Error: {processed_data.error_message}") + else: + log.warning(f"{item_log_prefix}: MergedTaskProcessorStage returned None for MergeTask '{item_key}'.") else: log.warning(f"{item_log_prefix}: Unknown item type '{type(item)}'. Skipping.") item_key = f"unknown_item_{item_index}" @@ -230,6 +244,8 @@ class PipelineOrchestrator: # 2. Scale (Optional) scaling_mode = getattr(context.config_obj, "INITIAL_SCALING_MODE", "NONE") if scaling_mode != "NONE" and current_image_data is not None and current_image_data.size > 0: + if isinstance(item, MergeTaskDefinition): # Log scaling call for merge tasks + log.info(f"{item_log_prefix}: Calling InitialScalingStage for MergeTask '{item_key}' (Mode: {scaling_mode})...") log.debug(f"{item_log_prefix}: Applying initial scaling (Mode: {scaling_mode})...") scale_input = InitialScalingInput( image_data=current_image_data, @@ -255,6 +271,8 @@ class PipelineOrchestrator: # Don't mark as asset error, just skip this item's saving continue # Next item + if isinstance(item, MergeTaskDefinition): # Log save call for merge tasks + log.info(f"{item_log_prefix}: Calling SaveVariantsStage for MergeTask '{item_key}'...") log.debug(f"{item_log_prefix}: Saving variants...") # Prepare input for save stage internal_map_type = processed_data.final_internal_map_type if isinstance(processed_data, ProcessedRegularMapData) else processed_data.output_map_type @@ -282,8 +300,12 @@ class PipelineOrchestrator: png_compression_level=context.config_obj.png_compression_level, jpg_quality=context.config_obj.jpg_quality, output_filename_pattern=context.config_obj.output_filename_pattern, + resolution_threshold_for_jpg=getattr(context.config_obj, "RESOLUTION_THRESHOLD_FOR_JPG", None) # Added ) saved_data = self._save_stage.execute(save_input) + # Log saved_data for merge tasks + if isinstance(item, MergeTaskDefinition): + log.info(f"{item_log_prefix}: SaveVariantsStage result for MergeTask '{item_key}' - Status: {saved_data.status if saved_data else 'N/A'}, Saved Files: {len(saved_data.saved_files_details) if saved_data else 0}") # Check save status and finalize item result if saved_data and saved_data.status.startswith("Processed"): @@ -300,6 +322,9 @@ class PipelineOrchestrator: # Add source file if regular map "source_file": str(processed_data.source_file_path) if isinstance(processed_data, ProcessedRegularMapData) else None, } + # Log final details addition for merge tasks + if isinstance(item, MergeTaskDefinition): + log.info(f"{item_log_prefix}: Adding final details to context.processed_maps_details for MergeTask '{item_key}'. Details: {final_details}") context.processed_maps_details[item_key] = final_details else: error_msg = saved_data.error_message if saved_data else "Save stage returned None" diff --git a/processing/pipeline/stages/output_organization.py b/processing/pipeline/stages/output_organization.py index a17a8ba..205fda0 100644 --- a/processing/pipeline/stages/output_organization.py +++ b/processing/pipeline/stages/output_organization.py @@ -278,59 +278,10 @@ class OutputOrganizationStage(ProcessingStage): else: logger.debug(f"Asset '{asset_name_for_log}': No processed individual maps to organize.") - # B. Organize Merged Maps - if context.merged_maps_details: - logger.debug(f"Asset '{asset_name_for_log}': Organizing {len(context.merged_maps_details)} merged map(s).") - for merge_op_id, details in context.merged_maps_details.items(): # Use merge_op_id - if details.get('status') != 'Processed' or not details.get('temp_merged_file'): - logger.debug(f"Asset '{asset_name_for_log}': Skipping merge op id '{merge_op_id}' due to status '{details.get('status')}' or missing temp file.") - continue - - temp_file_path = Path(details['temp_merged_file']) - map_type = details.get('map_type', 'unknown_merged_map') # This is the output_map_type of the merge rule - # Merged maps might not have a simple 'resolution' token like individual maps. - # We'll use a placeholder or derive if possible. - resolution_str = details.get('merged_resolution_name', 'mergedRes') - - - token_data_merged = { - "assetname": asset_name_for_log, - "supplier": context.effective_supplier or "DefaultSupplier", - "maptype": map_type, - "resolution": resolution_str, - "ext": temp_file_path.suffix.lstrip('.'), - "incrementingvalue": getattr(context, 'incrementing_value', None), - "sha5": getattr(context, 'sha5_value', None) - } - token_data_merged_cleaned = {k: v for k, v in token_data_merged.items() if v is not None} - - output_filename_merged = generate_path_from_pattern(output_filename_pattern_config, token_data_merged_cleaned) - - try: - relative_dir_path_str_merged = generate_path_from_pattern( - pattern_string=output_dir_pattern, - token_data=token_data_merged_cleaned - ) - final_path_merged = Path(context.output_base_path) / Path(relative_dir_path_str_merged) / Path(output_filename_merged) - final_path_merged.parent.mkdir(parents=True, exist_ok=True) - - if final_path_merged.exists() and not overwrite_existing: - logger.info(f"Asset '{asset_name_for_log}': Output file {final_path_merged} exists and overwrite is disabled. Skipping copy for merged map.") - else: - shutil.copy2(temp_file_path, final_path_merged) - logger.info(f"Asset '{asset_name_for_log}': Copied merged map {temp_file_path} to {final_path_merged}") - final_output_files.append(str(final_path_merged)) - - context.merged_maps_details[merge_op_id]['final_output_path'] = str(final_path_merged) - context.merged_maps_details[merge_op_id]['status'] = 'Organized' - - except Exception as e: - logger.error(f"Asset '{asset_name_for_log}': Failed to copy merged map {temp_file_path} to destination for merge op id '{merge_op_id}'. Error: {e}", exc_info=True) - context.status_flags['output_organization_error'] = True - context.asset_metadata['status'] = "Failed (Output Organization Error)" - context.merged_maps_details[merge_op_id]['status'] = 'Organization Failed' - else: - logger.debug(f"Asset '{asset_name_for_log}': No merged maps to organize.") + # B. Organize Merged Maps (OBSOLETE BLOCK - Merged maps are handled by the main loop processing context.processed_maps_details) + # The log "No merged maps to organize" will no longer appear from here. + # If merged maps are not appearing, the issue is likely that they are not being added + # to context.processed_maps_details with 'saved_files_info' by the orchestrator/SaveVariantsStage. # C. Organize Extra Files (e.g., previews, text files) logger.debug(f"Asset '{asset_name_for_log}': Checking for EXTRA files to organize.") diff --git a/processing/pipeline/stages/prepare_processing_items.py b/processing/pipeline/stages/prepare_processing_items.py index adfaf16..cee6c2e 100644 --- a/processing/pipeline/stages/prepare_processing_items.py +++ b/processing/pipeline/stages/prepare_processing_items.py @@ -65,11 +65,13 @@ class PrepareProcessingItemsStage(ProcessingStage): task_key = f"merged_task_{task_idx}" # Basic validation for merge task data (can be expanded) if not task_data.get('output_map_type') or not task_data.get('merge_rule_config'): - log.warning(f"Asset '{asset_name_for_log}', Task Index {task_idx}: Skipping merge task due to missing 'output_map_type' or 'merge_rule_config'.") + log.warning(f"Asset '{asset_name_for_log}', Task Index {task_idx}: Skipping merge task due to missing 'output_map_type' or 'merge_rule_config'. Task data: {task_data}") continue # Skip this specific task - items_to_process.append(MergeTaskDefinition(task_data=task_data, task_key=task_key)) + merge_def = MergeTaskDefinition(task_data=task_data, task_key=task_key) + log.info(f"Asset '{asset_name_for_log}': Identified and adding Merge Task: Key='{merge_def.task_key}', OutputType='{task_data.get('output_map_type', 'N/A')}'") + items_to_process.append(merge_def) else: - log.warning(f"Asset '{asset_name_for_log}': Item at index {task_idx} in '{merged_tasks_attr_name}' is not a dictionary. Skipping.") + log.warning(f"Asset '{asset_name_for_log}': Item at index {task_idx} in '{merged_tasks_attr_name}' is not a dictionary. Skipping. Item: {task_data}") log.debug(f"Asset '{asset_name_for_log}': Added {len(merged_tasks_list)} potential MergeTaskDefinition items.") else: log.warning(f"Asset '{asset_name_for_log}': Attribute '{merged_tasks_attr_name}' is not a list. Skipping merge tasks.") diff --git a/processing/pipeline/stages/save_variants.py b/processing/pipeline/stages/save_variants.py index 426fb15..482b1cc 100644 --- a/processing/pipeline/stages/save_variants.py +++ b/processing/pipeline/stages/save_variants.py @@ -61,6 +61,7 @@ class SaveVariantsStage(ProcessingStage): "jpg_quality": input_data.jpg_quality, "output_filename_pattern_tokens": input_data.output_filename_pattern_tokens, "output_filename_pattern": input_data.output_filename_pattern, + "resolution_threshold_for_jpg": input_data.resolution_threshold_for_jpg, # Added } log.debug(f"{log_prefix}: Calling save_image_variants utility.") diff --git a/processing/utils/image_saving_utils.py b/processing/utils/image_saving_utils.py index 66591a8..01ec7a1 100644 --- a/processing/utils/image_saving_utils.py +++ b/processing/utils/image_saving_utils.py @@ -33,6 +33,7 @@ def save_image_variants( jpg_quality: int, output_filename_pattern_tokens: Dict[str, Any], # Must include 'output_base_directory': Path and 'asset_name': str output_filename_pattern: str, + resolution_threshold_for_jpg: Optional[int] = None, # Added # Consider adding ipu or relevant parts of it if not importing globally ) -> List[Dict[str, Any]]: """ @@ -113,8 +114,10 @@ def save_image_variants( else: logger.error(f"Unsupported target bit depth: {target_bit_depth}. Defaulting to 8-bit format.") output_ext = output_format_8bit.lstrip('.').lower() + + current_output_ext = output_ext # Store the initial extension based on bit depth - logger.info(f"SaveImageVariants: Determined target bit depth: {target_bit_depth}, Output format: {output_ext} for map type {base_map_type}") + logger.info(f"SaveImageVariants: Determined target bit depth: {target_bit_depth}, Initial output format: {current_output_ext} for map type {base_map_type}") # 4. Generate and Save Resolution Variants # Sort resolutions by max dimension descending @@ -167,7 +170,16 @@ def save_image_variants( current_tokens = output_filename_pattern_tokens.copy() current_tokens['maptype'] = base_map_type current_tokens['resolution'] = res_key - current_tokens['ext'] = output_ext + + # Determine final extension for this variant, considering JPG threshold + final_variant_ext = current_output_ext + if target_bit_depth == 8 and resolution_threshold_for_jpg is not None and \ + max(target_w_res, target_h_res) > resolution_threshold_for_jpg and \ + current_output_ext == 'png': # Only convert if current 8-bit is PNG + final_variant_ext = 'jpg' + logger.info(f"SaveImageVariants: Overriding 8-bit PNG to JPG for {base_map_type} {res_key} due to resolution {max(target_w_res, target_h_res)}px > threshold {resolution_threshold_for_jpg}px.") + + current_tokens['ext'] = final_variant_ext try: # Replace placeholders in the pattern @@ -196,11 +208,11 @@ def save_image_variants( # Prepare Save Parameters save_params_cv2 = [] - if output_ext == 'jpg': + if final_variant_ext == 'jpg': # Check against final_variant_ext save_params_cv2.append(cv2.IMWRITE_JPEG_QUALITY) save_params_cv2.append(jpg_quality) logger.debug(f"SaveImageVariants: Using JPG quality: {jpg_quality} for {base_map_type} {res_key}") - elif output_ext == 'png': + elif final_variant_ext == 'png': # Check against final_variant_ext save_params_cv2.append(cv2.IMWRITE_PNG_COMPRESSION) save_params_cv2.append(png_compression_level) logger.debug(f"SaveImageVariants: Using PNG compression level: {png_compression_level} for {base_map_type} {res_key}") @@ -237,7 +249,7 @@ def save_image_variants( saved_file_details.append({ 'path': str(output_path), 'resolution_key': res_key, - 'format': output_ext, + 'format': final_variant_ext, # Log the actual saved format 'bit_depth': target_bit_depth, 'dimensions': (target_w_res, target_h_res) })