Compare commits
2 Commits
f800bb25a9
...
6e7daf260a
| Author | SHA1 | Date | |
|---|---|---|---|
| 6e7daf260a | |||
| 1cd81cb87a |
@ -30,7 +30,7 @@ These stages are executed sequentially once for each asset before the core item
|
||||
|
||||
3. **[`MetadataInitializationStage`](processing/pipeline/stages/metadata_initialization.py:81)** (`processing/pipeline/stages/metadata_initialization.py`):
|
||||
* **Responsibility**: Initializes the `context.asset_metadata` dictionary with base information derived from the [`AssetRule`](rule_structure.py:22), [`SourceRule`](rule_structure.py:40), and [`Configuration`](configuration.py:68). This includes asset name, IDs, source/output paths, timestamps, and initial status.
|
||||
* **Context Interaction**: Populates `context.asset_metadata` and initializes empty dictionaries for `processed_maps_details` and `merged_maps_details`.
|
||||
* **Context Interaction**: Populates `context.asset_metadata`. Initializes `context.processed_maps_details` and `context.merged_maps_details` as empty dictionaries (these are used internally by subsequent stages but are not directly part of the final `metadata.json` in their original form).
|
||||
|
||||
4. **[`FileRuleFilterStage`](processing/pipeline/stages/file_rule_filter.py:10)** (`processing/pipeline/stages/file_rule_filter.py`):
|
||||
* **Responsibility**: Filters the [`FileRule`](rule_structure.py:5) objects associated with the asset to determine which individual files should be considered for processing. It identifies and excludes files matching "FILE_IGNORE" rules based on their `item_type`.
|
||||
@ -78,11 +78,14 @@ These stages are executed sequentially once for each asset after the core item p
|
||||
|
||||
1. **[`OutputOrganizationStage`](processing/pipeline/stages/output_organization.py:14)** (`processing/pipeline/stages/output_organization.py`):
|
||||
* **Responsibility**: Determines the final output paths for all processed maps (including variants) and extra files based on configured patterns. It copies the temporary files generated by the core stages to these final destinations, creating directories as needed and respecting overwrite settings.
|
||||
* **Context Interaction**: Reads from `context.processed_maps_details` (using the "MAP_" prefixed `internal_map_type` to get the "standard type" via `get_filename_friendly_map_type` for output naming), `context.files_to_process` (for 'EXTRA' files), `context.output_base_path`, and [`Configuration`](configuration.py:68). Updates entries in `context.processed_maps_details` with final paths and organization status. Populates `context.asset_metadata['final_output_files']`. (Note: Legacy code for `'Processed_With_Variants'` status has been removed from this stage).
|
||||
* **Context Interaction**: Reads from `context.processed_maps_details`, `context.files_to_process` (for 'EXTRA' files), `context.output_base_path`, and [`Configuration`](configuration.py:68). Updates entries in `context.processed_maps_details` with organization status. Populates `context.asset_metadata['maps']` with the final map structure:
|
||||
* The `maps` object is a dictionary where keys are standard map types (e.g., "COL", "REFL").
|
||||
* Each entry contains a `variant_paths` dictionary, where keys are resolution strings (e.g., "8K", "4K") and values are the filenames of the map variants (relative to the asset's output directory).
|
||||
It also populates `context.asset_metadata['final_output_files']` with a list of absolute paths to all generated files (this list itself is not saved in the final `metadata.json`).
|
||||
|
||||
2. **[`MetadataFinalizationAndSaveStage`](processing/pipeline/stages/metadata_finalization_save.py:14)** (`processing/pipeline/stages/metadata_finalization_save.py`):
|
||||
* **Responsibility**: Finalizes the `context.asset_metadata` (setting end time, final status based on flags). It restructures the processed map details for inclusion, determines the save path for the metadata file based on configuration and patterns, serializes the metadata to JSON, and saves the `metadata.json` file to the final output location.
|
||||
* **Context Interaction**: Reads from `context.asset_metadata`, `context.processed_maps_details`, `context.merged_maps_details`, `context.output_base_path`, and [`Configuration`](configuration.py:68). Writes the `metadata.json` file and updates `context.asset_metadata` with its final path and status.
|
||||
* **Responsibility**: Finalizes the `context.asset_metadata` (setting final status based on flags). It determines the save path for the metadata file based on configuration and patterns, serializes the `context.asset_metadata` (which now contains the structured `maps` data from `OutputOrganizationStage`) to JSON, and saves the `metadata.json` file.
|
||||
* **Context Interaction**: Reads from `context.asset_metadata` (including the `maps` structure), `context.output_base_path`, and [`Configuration`](configuration.py:68). Before saving, it explicitly removes the `final_output_files` key from `context.asset_metadata`. The `processing_end_time` is also no longer added. The `metadata.json` file is written, and `context.asset_metadata` is updated with its final path and status. The older `processed_maps_details` and `merged_maps_details` from the context are not directly included in the JSON.
|
||||
|
||||
## External Steps
|
||||
|
||||
|
||||
@ -41,7 +41,7 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
|
||||
# Check Skip Flag
|
||||
if context.status_flags.get('skip_asset'):
|
||||
context.asset_metadata['status'] = "Skipped"
|
||||
context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
|
||||
# context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
|
||||
context.asset_metadata['notes'] = context.status_flags.get('skip_reason', 'Skipped early in pipeline')
|
||||
logger.info(
|
||||
f"Asset '{asset_name_for_log}': Marked as skipped. Reason: {context.asset_metadata['notes']}"
|
||||
@ -51,7 +51,7 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
|
||||
# However, if we are here, asset_metadata IS initialized.
|
||||
|
||||
# A. Finalize Metadata
|
||||
context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
|
||||
# context.asset_metadata['processing_end_time'] = datetime.datetime.now().isoformat()
|
||||
|
||||
# Determine final status (if not already set to Skipped)
|
||||
if context.asset_metadata.get('status') != "Skipped":
|
||||
@ -115,8 +115,8 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
|
||||
restructured_processed_maps[map_key] = new_map_entry
|
||||
|
||||
# Assign the restructured details. Note: 'processed_map_details' (singular 'map') is the key in asset_metadata.
|
||||
context.asset_metadata['processed_map_details'] = restructured_processed_maps
|
||||
context.asset_metadata['merged_map_details'] = getattr(context, 'merged_maps_details', {})
|
||||
# context.asset_metadata['processed_map_details'] = restructured_processed_maps
|
||||
# context.asset_metadata['merged_map_details'] = getattr(context, 'merged_maps_details', {})
|
||||
|
||||
# (Optional) Add a list of all temporary files
|
||||
# context.asset_metadata['temporary_files'] = getattr(context, 'temporary_files', []) # Assuming this is populated elsewhere
|
||||
@ -203,6 +203,8 @@ class MetadataFinalizationAndSaveStage(ProcessingStage):
|
||||
return [make_serializable(i) for i in data]
|
||||
return data
|
||||
|
||||
# final_output_files is populated by OutputOrganizationStage. Explicitly remove it as per user request.
|
||||
context.asset_metadata.pop('final_output_files', None)
|
||||
serializable_metadata = make_serializable(context.asset_metadata)
|
||||
|
||||
with open(metadata_save_path, 'w') as f:
|
||||
|
||||
@ -61,8 +61,10 @@ class OutputOrganizationStage(ProcessingStage):
|
||||
if saved_files_info and isinstance(saved_files_info, list) and len(saved_files_info) > 0:
|
||||
logger.debug(f"Asset '{asset_name_for_log}': Organizing {len(saved_files_info)} variants for map key '{processed_map_key}' (map type: {base_map_type}) from SaveVariantsStage.")
|
||||
|
||||
map_metadata_entry = context.asset_metadata.setdefault('maps', {}).setdefault(processed_map_key, {})
|
||||
map_metadata_entry['map_type'] = base_map_type
|
||||
# Use base_map_type (e.g., "COL") as the key for the map entry
|
||||
map_metadata_entry = context.asset_metadata.setdefault('maps', {}).setdefault(base_map_type, {})
|
||||
# map_type is now the key, so no need to store it inside the entry
|
||||
# map_metadata_entry['map_type'] = base_map_type
|
||||
map_metadata_entry.setdefault('variant_paths', {}) # Initialize if not present
|
||||
|
||||
processed_any_variant_successfully = False
|
||||
@ -114,8 +116,8 @@ class OutputOrganizationStage(ProcessingStage):
|
||||
# Optionally update variant_detail status if needed
|
||||
|
||||
# Store relative path in metadata
|
||||
relative_final_variant_path_str = str(Path(relative_dir_path_str_variant) / Path(output_filename_variant))
|
||||
map_metadata_entry['variant_paths'][variant_resolution_key] = relative_final_variant_path_str
|
||||
# Store only the filename, as it's relative to the metadata.json location
|
||||
map_metadata_entry['variant_paths'][variant_resolution_key] = output_filename_variant
|
||||
processed_any_variant_successfully = True
|
||||
|
||||
except Exception as e:
|
||||
@ -182,11 +184,16 @@ class OutputOrganizationStage(ProcessingStage):
|
||||
details['final_output_path'] = str(final_path)
|
||||
|
||||
# Update asset_metadata for metadata.json
|
||||
map_metadata_entry = context.asset_metadata.setdefault('maps', {}).setdefault(processed_map_key, {})
|
||||
map_metadata_entry['map_type'] = base_map_type
|
||||
map_metadata_entry['path'] = str(Path(relative_dir_path_str) / Path(output_filename)) # Store relative path
|
||||
if 'variant_paths' in map_metadata_entry: # Clean up variant paths if present from previous runs
|
||||
del map_metadata_entry['variant_paths']
|
||||
# Use base_map_type (e.g., "COL") as the key for the map entry
|
||||
map_metadata_entry = context.asset_metadata.setdefault('maps', {}).setdefault(base_map_type, {})
|
||||
# map_type is now the key, so no need to store it inside the entry
|
||||
# map_metadata_entry['map_type'] = base_map_type
|
||||
# Store single path in variant_paths, keyed by its resolution string
|
||||
# Store only the filename, as it's relative to the metadata.json location
|
||||
map_metadata_entry.setdefault('variant_paths', {})[resolution_str] = output_filename
|
||||
# Remove old cleanup logic, as variant_paths is now the standard
|
||||
# if 'variant_paths' in map_metadata_entry:
|
||||
# del map_metadata_entry['variant_paths']
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Asset '{asset_name_for_log}': Failed to copy {temp_file_path} for map key '{processed_map_key}'. Error: {e}", exc_info=True)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user