Compare commits

..

No commits in common. "6e7daf260ad482cbd584b4884c7c41334c982889" and "f800bb25a9cb0c28e0adf2073b22274fb3682185" have entirely different histories.

3 changed files with 17 additions and 29 deletions

View File

@ -30,7 +30,7 @@ These stages are executed sequentially once for each asset before the core item
3. **[`MetadataInitializationStage`](processing/pipeline/stages/metadata_initialization.py:81)** (`processing/pipeline/stages/metadata_initialization.py`):
* **Responsibility**: Initializes the `context.asset_metadata` dictionary with base information derived from the [`AssetRule`](rule_structure.py:22), [`SourceRule`](rule_structure.py:40), and [`Configuration`](configuration.py:68). This includes asset name, IDs, source/output paths, timestamps, and initial status.
* **Context Interaction**: Populates `context.asset_metadata`. Initializes `context.processed_maps_details` and `context.merged_maps_details` as empty dictionaries (these are used internally by subsequent stages but are not directly part of the final `metadata.json` in their original form).
* **Context Interaction**: Populates `context.asset_metadata` and initializes empty dictionaries for `processed_maps_details` and `merged_maps_details`.
4. **[`FileRuleFilterStage`](processing/pipeline/stages/file_rule_filter.py:10)** (`processing/pipeline/stages/file_rule_filter.py`):
* **Responsibility**: Filters the [`FileRule`](rule_structure.py:5) objects associated with the asset to determine which individual files should be considered for processing. It identifies and excludes files matching "FILE_IGNORE" rules based on their `item_type`.
@ -78,14 +78,11 @@ These stages are executed sequentially once for each asset after the core item p
1. **[`OutputOrganizationStage`](processing/pipeline/stages/output_organization.py:14)** (`processing/pipeline/stages/output_organization.py`):
* **Responsibility**: Determines the final output paths for all processed maps (including variants) and extra files based on configured patterns. It copies the temporary files generated by the core stages to these final destinations, creating directories as needed and respecting overwrite settings.
* **Context Interaction**: Reads from `context.processed_maps_details`, `context.files_to_process` (for 'EXTRA' files), `context.output_base_path`, and [`Configuration`](configuration.py:68). Updates entries in `context.processed_maps_details` with organization status. Populates `context.asset_metadata['maps']` with the final map structure:
* The `maps` object is a dictionary where keys are standard map types (e.g., "COL", "REFL").
* Each entry contains a `variant_paths` dictionary, where keys are resolution strings (e.g., "8K", "4K") and values are the filenames of the map variants (relative to the asset's output directory).
It also populates `context.asset_metadata['final_output_files']` with a list of absolute paths to all generated files (this list itself is not saved in the final `metadata.json`).
* **Context Interaction**: Reads from `context.processed_maps_details` (using the "MAP_" prefixed `internal_map_type` to get the "standard type" via `get_filename_friendly_map_type` for output naming), `context.files_to_process` (for 'EXTRA' files), `context.output_base_path`, and [`Configuration`](configuration.py:68). Updates entries in `context.processed_maps_details` with final paths and organization status. Populates `context.asset_metadata['final_output_files']`. (Note: Legacy code for `'Processed_With_Variants'` status has been removed from this stage).
2. **[`MetadataFinalizationAndSaveStage`](processing/pipeline/stages/metadata_finalization_save.py:14)** (`processing/pipeline/stages/metadata_finalization_save.py`):
* **Responsibility**: Finalizes the `context.asset_metadata` (setting final status based on flags). It determines the save path for the metadata file based on configuration and patterns, serializes the `context.asset_metadata` (which now contains the structured `maps` data from `OutputOrganizationStage`) to JSON, and saves the `metadata.json` file.
* **Context Interaction**: Reads from `context.asset_metadata` (including the `maps` structure), `context.output_base_path`, and [`Configuration`](configuration.py:68). Before saving, it explicitly removes the `final_output_files` key from `context.asset_metadata`. The `processing_end_time` is also no longer added. The `metadata.json` file is written, and `context.asset_metadata` is updated with its final path and status. The older `processed_maps_details` and `merged_maps_details` from the context are not directly included in the JSON.
* **Responsibility**: Finalizes the `context.asset_metadata` (setting end time, final status based on flags). It restructures the processed map details for inclusion, determines the save path for the metadata file based on configuration and patterns, serializes the metadata to JSON, and saves the `metadata.json` file to the final output location.
* **Context Interaction**: Reads from `context.asset_metadata`, `context.processed_maps_details`, `context.merged_maps_details`, `context.output_base_path`, and [`Configuration`](configuration.py:68). Writes the `metadata.json` file and updates `context.asset_metadata` with its final path and status.
## External Steps

View File

@ -41,7 +41,7 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
# Check Skip Flag
if context.status_flags.get('skip_asset'):
context.asset_metadata['status'] = "Skipped"
# context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
context.asset_metadata['notes'] = context.status_flags.get('skip_reason', 'Skipped early in pipeline')
logger.info(
f"Asset '{asset_name_for_log}': Marked as skipped. Reason: {context.asset_metadata['notes']}"
@ -51,7 +51,7 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
# However, if we are here, asset_metadata IS initialized.
# A. Finalize Metadata
# context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
# Determine final status (if not already set to Skipped)
if context.asset_metadata.get('status') != "Skipped":
@ -115,8 +115,8 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
restructured_processed_maps[map_key] = new_map_entry
# Assign the restructured details. Note: 'processed_map_details' (singular 'map') is the key in asset_metadata.
# context.asset_metadata['processed_map_details'] = restructured_processed_maps
# context.asset_metadata['merged_map_details'] = getattr(context, 'merged_maps_details', {})
context.asset_metadata['processed_map_details'] = restructured_processed_maps
context.asset_metadata['merged_map_details'] = getattr(context, 'merged_maps_details', {})
# (Optional) Add a list of all temporary files
# context.asset_metadata['temporary_files'] = getattr(context, 'temporary_files', []) # Assuming this is populated elsewhere
@ -203,8 +203,6 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
return [make_serializable(i) for i in data]
return data
# final_output_files is populated by OutputOrganizationStage. Explicitly remove it as per user request.
context.asset_metadata.pop('final_output_files', None)
serializable_metadata = make_serializable(context.asset_metadata)
with open(metadata_save_path, 'w') as f:

View File

@ -61,10 +61,8 @@ class OutputOrganizationStage(ProcessingStage):
if saved_files_info and isinstance(saved_files_info, list) and len(saved_files_info) > 0:
logger.debug(f"Asset '{asset_name_for_log}': Organizing {len(saved_files_info)} variants for map key '{processed_map_key}' (map type: {base_map_type}) from SaveVariantsStage.")
# Use base_map_type (e.g., "COL") as the key for the map entry
map_metadata_entry = context.asset_metadata.setdefault('maps', {}).setdefault(base_map_type, {})
# map_type is now the key, so no need to store it inside the entry
# map_metadata_entry['map_type'] = base_map_type
map_metadata_entry = context.asset_metadata.setdefault('maps', {}).setdefault(processed_map_key, {})
map_metadata_entry['map_type'] = base_map_type
map_metadata_entry.setdefault('variant_paths', {}) # Initialize if not present
processed_any_variant_successfully = False
@ -116,8 +114,8 @@ class OutputOrganizationStage(ProcessingStage):
# Optionally update variant_detail status if needed
# Store relative path in metadata
# Store only the filename, as it's relative to the metadata.json location
map_metadata_entry['variant_paths'][variant_resolution_key] = output_filename_variant
relative_final_variant_path_str = str(Path(relative_dir_path_str_variant) / Path(output_filename_variant))
map_metadata_entry['variant_paths'][variant_resolution_key] = relative_final_variant_path_str
processed_any_variant_successfully = True
except Exception as e:
@ -184,16 +182,11 @@ class OutputOrganizationStage(ProcessingStage):
details['final_output_path'] = str(final_path)
# Update asset_metadata for metadata.json
# Use base_map_type (e.g., "COL") as the key for the map entry
map_metadata_entry = context.asset_metadata.setdefault('maps', {}).setdefault(base_map_type, {})
# map_type is now the key, so no need to store it inside the entry
# map_metadata_entry['map_type'] = base_map_type
# Store single path in variant_paths, keyed by its resolution string
# Store only the filename, as it's relative to the metadata.json location
map_metadata_entry.setdefault('variant_paths', {})[resolution_str] = output_filename
# Remove old cleanup logic, as variant_paths is now the standard
# if 'variant_paths' in map_metadata_entry:
# del map_metadata_entry['variant_paths']
map_metadata_entry = context.asset_metadata.setdefault('maps', {}).setdefault(processed_map_key, {})
map_metadata_entry['map_type'] = base_map_type
map_metadata_entry['path'] = str(Path(relative_dir_path_str) / Path(output_filename)) # Store relative path
if 'variant_paths' in map_metadata_entry: # Clean up variant paths if present from previous runs
del map_metadata_entry['variant_paths']
except Exception as e:
logger.error(f"Asset '{asset_name_for_log}': Failed to copy {temp_file_path} for map key '{processed_map_key}'. Error: {e}", exc_info=True)