import os import sys import datetime import re import logging from pathlib import Path from typing import Optional, Dict logger = logging.getLogger(__name__) def generate_path_from_pattern(pattern_string: str, token_data: dict) -> str: """ Generates a file path by replacing tokens in a pattern string with values from the provided token_data dictionary. Args: pattern_string: The string containing tokens to be replaced (e.g., "[Assettype]/[supplier]/[assetname]_[resolution].[ext]"). token_data: A dictionary where keys are token names (without brackets, case-insensitive) and values are the replacement strings. Special tokens like 'IncrementingValue' or '####' should be provided here if used in the pattern. Returns: The generated path string with tokens replaced. Raises: ValueError: If a token required by the pattern (excluding date/time/apppath) is not found in token_data. KeyError: If internal logic fails to find expected date/time components. """ if not isinstance(pattern_string, str): raise TypeError("pattern_string must be a string") if not isinstance(token_data, dict): raise TypeError("token_data must be a dictionary") # Normalize token keys in the input data for case-insensitive matching normalized_token_data = {k.lower(): v for k, v in token_data.items()} # --- Prepare dynamic/default token values --- now = datetime.datetime.now() dynamic_tokens = { 'date': now.strftime('%Y%m%d'), 'time': now.strftime('%H%M%S'), # Provide a default ApplicationPath, can be overridden by token_data 'applicationpath': os.path.abspath(os.getcwd()) } # Merge dynamic tokens with provided data, allowing overrides # Provided data takes precedence full_token_data = {**dynamic_tokens, **normalized_token_data} # --- Define known tokens (lowercase) --- # Add variations like #### for IncrementingValue known_tokens_lc = { 'assettype', 'supplier', 'assetname', 'resolution', 'ext', 'incrementingvalue', '####', 'date', 'time', 'sha5', 'applicationpath' } output_path = pattern_string # --- Regex to find all tokens like [TokenName] --- token_pattern = re.compile(r'\[([^\]]+)\]') tokens_found = token_pattern.findall(pattern_string) processed_tokens_lc = set() for token_name in tokens_found: token_name_lc = token_name.lower() # Handle alias #### for IncrementingValue lookup_key = 'incrementingvalue' if token_name_lc == '####' else token_name_lc if lookup_key in processed_tokens_lc: continue # Already processed this token type if lookup_key in full_token_data: replacement_value = str(full_token_data[lookup_key]) # Ensure string # Replace all occurrences of this token (case-insensitive original name) # We use a regex finditer to replace only the specific token format # to avoid replacing substrings within other words. current_token_pattern = re.compile(re.escape(f'[{token_name}]'), re.IGNORECASE) output_path = current_token_pattern.sub(replacement_value, output_path) processed_tokens_lc.add(lookup_key) elif lookup_key in known_tokens_lc: # Known token but not found in data (and not a dynamic one we generated) logger.warning(f"Token '[{token_name}]' found in pattern but not in token_data.") # Raise error for non-optional tokens if needed, or replace with placeholder # For now, let's raise an error to be explicit raise ValueError(f"Required token '[{token_name}]' not found in token_data.") else: # Token not recognized logger.warning(f"Unknown token '[{token_name}]' found in pattern string. Leaving it unchanged.") # --- Final path cleaning (optional, e.g., normalize separators) --- # output_path = os.path.normpath(output_path) # Consider implications on mixed separators return output_path def get_next_incrementing_value(output_base_path: Path, output_directory_pattern: str) -> str: """Determines the next incrementing value based on existing directories.""" # Implementation as detailed in the previous plan revision... logger.debug(f"Calculating next increment value for pattern '{output_directory_pattern}' in '{output_base_path}'") match = re.match(r"(.*?)(\[IncrementingValue\]|(#+))(.*)", output_directory_pattern) if not match: logger.warning(f"Could not find incrementing token ([IncrementingValue] or #+) in pattern '{output_directory_pattern}'. Defaulting to '00'.") return "00" # Default fallback if pattern doesn't contain the token prefix_pattern, increment_token, suffix_pattern = match.groups() num_digits = len(increment_token) if increment_token.startswith("#") else 2 # Default to 2 for [IncrementingValue] if not specified otherwise logger.debug(f"Parsed pattern: prefix='{prefix_pattern}', token='{increment_token}' ({num_digits} digits), suffix='{suffix_pattern}'") # Replace other tokens in prefix/suffix with '*' for globbing glob_prefix = re.sub(r'\[[^\]]+\]', '*', prefix_pattern) glob_suffix = re.sub(r'\[[^\]]+\]', '*', suffix_pattern) # Construct the glob pattern part for the number itself glob_increment_part = f"[{'0-9' * num_digits}]" # Matches exactly num_digits glob_pattern = f"{glob_prefix}{glob_increment_part}{glob_suffix}" logger.debug(f"Constructed glob pattern: {glob_pattern}") max_value = -1 try: # Prepare regex to extract the number from directory names matching the full pattern # Escape regex special characters in the literal parts of the pattern extract_prefix_re = re.escape(prefix_pattern) extract_suffix_re = re.escape(suffix_pattern) # The regex captures exactly num_digits between the escaped prefix and suffix extract_regex = re.compile(rf"^{extract_prefix_re}(\d{{{num_digits}}}){extract_suffix_re}.*") logger.debug(f"Constructed extraction regex: {extract_regex.pattern}") if not output_base_path.is_dir(): logger.warning(f"Output base path '{output_base_path}' does not exist or is not a directory. Cannot scan for existing values.") else: for item in output_base_path.glob(glob_pattern): if item.is_dir(): logger.debug(f"Checking directory: {item.name}") num_match = extract_regex.match(item.name) if num_match: try: current_val = int(num_match.group(1)) logger.debug(f"Extracted value {current_val} from {item.name}") max_value = max(max_value, current_val) except (ValueError, IndexError) as e: logger.warning(f"Could not parse number from matching directory '{item.name}': {e}") else: logger.debug(f"Directory '{item.name}' matched glob but not extraction regex.") except Exception as e: logger.error(f"Error searching for incrementing values using glob pattern '{glob_pattern}' in '{output_base_path}': {e}", exc_info=True) # Decide on fallback behavior - returning "00" might be safer than raising return "00" # Fallback on error during search next_value = max_value + 1 format_string = f"{{:0{num_digits}d}}" next_value_str = format_string.format(next_value) logger.info(f"Determined next incrementing value: {next_value_str} (Max found: {max_value})") return next_value_str # --- Basic Unit Tests --- if __name__ == "__main__": print("Running basic tests for path_utils.generate_path_from_pattern...") test_pattern_1 = "[Assettype]/[supplier]/[assetname]_[resolution]_[Date]_[Time].[ext]" test_data_1 = { "AssetType": "Texture", "supplier": "MegaScans", "assetName": "RustyMetalPanel", "Resolution": "4k", "EXT": "png", "Sha5": "abcde" # Included but not in pattern } expected_1_base = f"Texture/MegaScans/RustyMetalPanel_4k_" try: result_1 = generate_path_from_pattern(test_pattern_1, test_data_1) assert result_1.startswith(expected_1_base) assert result_1.endswith(".png") assert len(result_1.split('_')) == 5 # Check date and time were added print(f"PASS: Test 1 - Basic replacement: {result_1}") except Exception as e: print(f"FAIL: Test 1 - {e}") test_pattern_2 = "Output/[assetname]/[assetname]_####.[ext]" test_data_2 = { "assetname": "WoodFloor", "IncrementingValue": "001", "ext": "jpg" } expected_2 = "Output/WoodFloor/WoodFloor_001.jpg" try: result_2 = generate_path_from_pattern(test_pattern_2, test_data_2) assert result_2 == expected_2 print(f"PASS: Test 2 - IncrementingValue (####): {result_2}") except Exception as e: print(f"FAIL: Test 2 - {e}") test_pattern_3 = "AppPath=[ApplicationPath]/[assetname].[ext]" test_data_3 = {"assetname": "Test", "ext": "txt"} expected_3_start = f"AppPath={os.path.abspath(os.getcwd())}/Test.txt" try: result_3 = generate_path_from_pattern(test_pattern_3, test_data_3) assert result_3 == expected_3_start print(f"PASS: Test 3 - ApplicationPath (default): {result_3}") except Exception as e: print(f"FAIL: Test 3 - {e}") test_pattern_4 = "AppPath=[ApplicationPath]/[assetname].[ext]" test_data_4 = {"assetname": "Test", "ext": "txt", "ApplicationPath": "/custom/path"} expected_4 = "/custom/path/Test.txt" # Note: AppPath= part is replaced by the token logic # Correction: The pattern includes "AppPath=", so it should remain. expected_4_corrected = "AppPath=/custom/path/Test.txt" try: result_4 = generate_path_from_pattern(test_pattern_4, test_data_4) assert result_4 == expected_4_corrected print(f"PASS: Test 4 - ApplicationPath (override): {result_4}") except Exception as e: print(f"FAIL: Test 4 - {e}") test_pattern_5 = "[assetname]/[MissingToken].[ext]" test_data_5 = {"assetname": "FailureTest", "ext": "err"} try: generate_path_from_pattern(test_pattern_5, test_data_5) print("FAIL: Test 5 - Expected ValueError for missing token") except ValueError as e: assert "MissingToken" in str(e) print(f"PASS: Test 5 - Correctly raised ValueError for missing token: {e}") except Exception as e: print(f"FAIL: Test 5 - Incorrect exception type: {e}") test_pattern_6 = "[assetname]/[UnknownToken].[ext]" test_data_6 = {"assetname": "UnknownTest", "ext": "dat"} expected_6 = "UnknownTest/[UnknownToken].dat" # Unknown tokens are left as is try: # Capture warnings logging.basicConfig() with logging.catch_warnings(record=True) as w: result_6 = generate_path_from_pattern(test_pattern_6, test_data_6) assert result_6 == expected_6 assert len(w) == 1 assert "Unknown token '[UnknownToken]'" in str(w[0].message) print(f"PASS: Test 6 - Unknown token left unchanged: {result_6}") except Exception as e: print(f"FAIL: Test 6 - {e}") test_pattern_7 = "[assetname]/[assetname].png" # Case check test_data_7 = {"AssetName": "CaseTest"} expected_7 = "CaseTest/CaseTest.png" try: result_7 = generate_path_from_pattern(test_pattern_7, test_data_7) assert result_7 == expected_7 print(f"PASS: Test 7 - Case insensitivity: {result_7}") except Exception as e: print(f"FAIL: Test 7 - {e}") print("Basic tests finished.")