264 lines
12 KiB
Python
264 lines
12 KiB
Python
import os
|
|
import sys
|
|
import datetime
|
|
import re
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, Dict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def generate_path_from_pattern(pattern_string: str, token_data: dict) -> str:
|
|
"""
|
|
Generates a file path by replacing tokens in a pattern string with values
|
|
from the provided token_data dictionary.
|
|
|
|
Args:
|
|
pattern_string: The string containing tokens to be replaced (e.g.,
|
|
"[Assettype]/[supplier]/[assetname]_[resolution].[ext]").
|
|
token_data: A dictionary where keys are token names (without brackets,
|
|
case-insensitive) and values are the replacement strings.
|
|
Special tokens like 'IncrementingValue' or '####' should
|
|
be provided here if used in the pattern.
|
|
|
|
Returns:
|
|
The generated path string with tokens replaced.
|
|
|
|
Raises:
|
|
ValueError: If a token required by the pattern (excluding date/time/apppath)
|
|
is not found in token_data.
|
|
KeyError: If internal logic fails to find expected date/time components.
|
|
"""
|
|
if not isinstance(pattern_string, str):
|
|
raise TypeError("pattern_string must be a string")
|
|
if not isinstance(token_data, dict):
|
|
raise TypeError("token_data must be a dictionary")
|
|
|
|
# Normalize token keys in the input data for case-insensitive matching
|
|
normalized_token_data = {k.lower(): v for k, v in token_data.items()}
|
|
|
|
# --- Prepare dynamic/default token values ---
|
|
now = datetime.datetime.now()
|
|
dynamic_tokens = {
|
|
'date': now.strftime('%Y%m%d'),
|
|
'time': now.strftime('%H%M%S'),
|
|
# Provide a default ApplicationPath, can be overridden by token_data
|
|
'applicationpath': os.path.abspath(os.getcwd())
|
|
}
|
|
|
|
# Merge dynamic tokens with provided data, allowing overrides
|
|
# Provided data takes precedence
|
|
full_token_data = {**dynamic_tokens, **normalized_token_data}
|
|
|
|
# --- Define known tokens (lowercase) ---
|
|
# Add variations like #### for IncrementingValue
|
|
known_tokens_lc = {
|
|
'assettype', 'supplier', 'assetname', 'resolution', 'ext',
|
|
'incrementingvalue', '####', 'date', 'time', 'sha5', 'applicationpath'
|
|
}
|
|
|
|
output_path = pattern_string
|
|
|
|
# --- Regex to find all tokens like [TokenName] ---
|
|
token_pattern = re.compile(r'\[([^\]]+)\]')
|
|
tokens_found = token_pattern.findall(pattern_string)
|
|
|
|
processed_tokens_lc = set()
|
|
|
|
for token_name in tokens_found:
|
|
token_name_lc = token_name.lower()
|
|
|
|
# Handle alias #### for IncrementingValue
|
|
lookup_key = 'incrementingvalue' if token_name_lc == '####' else token_name_lc
|
|
|
|
if lookup_key in processed_tokens_lc:
|
|
continue # Already processed this token type
|
|
|
|
if lookup_key in full_token_data:
|
|
replacement_value = str(full_token_data[lookup_key]) # Ensure string
|
|
# Replace all occurrences of this token (case-insensitive original name)
|
|
# We use a regex finditer to replace only the specific token format
|
|
# to avoid replacing substrings within other words.
|
|
current_token_pattern = re.compile(re.escape(f'[{token_name}]'), re.IGNORECASE)
|
|
output_path = current_token_pattern.sub(replacement_value, output_path)
|
|
processed_tokens_lc.add(lookup_key)
|
|
elif lookup_key in known_tokens_lc:
|
|
# Known token but not found in data (and not a dynamic one we generated)
|
|
logger.warning(f"Token '[{token_name}]' found in pattern but not in token_data.")
|
|
# Raise error for non-optional tokens if needed, or replace with placeholder
|
|
# For now, let's raise an error to be explicit
|
|
raise ValueError(f"Required token '[{token_name}]' not found in token_data.")
|
|
else:
|
|
# Token not recognized
|
|
logger.warning(f"Unknown token '[{token_name}]' found in pattern string. Leaving it unchanged.")
|
|
|
|
# --- Final path cleaning (optional, e.g., normalize separators) ---
|
|
# output_path = os.path.normpath(output_path) # Consider implications on mixed separators
|
|
|
|
return output_path
|
|
def get_next_incrementing_value(output_base_path: Path, output_directory_pattern: str) -> str:
|
|
"""Determines the next incrementing value based on existing directories."""
|
|
logger.debug(f"Calculating next increment value for pattern '{output_directory_pattern}' in '{output_base_path}'")
|
|
match = re.match(r"(.*?)(\[IncrementingValue\]|(#+))(.*)", output_directory_pattern)
|
|
if not match:
|
|
logger.warning(f"Could not find incrementing token ([IncrementingValue] or #+) in pattern '{output_directory_pattern}'. Defaulting to '00'.")
|
|
return "00" # Default fallback if pattern doesn't contain the token
|
|
|
|
prefix_pattern, increment_token, suffix_pattern = match.groups()
|
|
num_digits = len(increment_token) if increment_token.startswith("#") else 2 # Default to 2 for [IncrementingValue] if not specified otherwise
|
|
logger.debug(f"Parsed pattern: prefix='{prefix_pattern}', token='{increment_token}' ({num_digits} digits), suffix='{suffix_pattern}'")
|
|
|
|
# Replace other tokens in prefix/suffix with '*' for globbing
|
|
glob_prefix = re.sub(r'\[[^\]]+\]', '*', prefix_pattern)
|
|
glob_suffix = re.sub(r'\[[^\]]+\]', '*', suffix_pattern)
|
|
# Construct the glob pattern part for the number itself
|
|
glob_increment_part = f"[{'0-9' * num_digits}]" # Matches exactly num_digits
|
|
glob_pattern = f"{glob_prefix}{glob_increment_part}{glob_suffix}"
|
|
logger.debug(f"Constructed glob pattern: {glob_pattern}")
|
|
|
|
max_value = -1
|
|
try:
|
|
# Prepare regex to extract the number from directory names matching the full pattern
|
|
# Escape regex special characters in the literal parts of the pattern
|
|
extract_prefix_re = re.escape(prefix_pattern)
|
|
extract_suffix_re = re.escape(suffix_pattern)
|
|
# The regex captures exactly num_digits between the escaped prefix and suffix
|
|
extract_regex = re.compile(rf"^{extract_prefix_re}(\d{{{num_digits}}}){extract_suffix_re}.*")
|
|
logger.debug(f"Constructed extraction regex: {extract_regex.pattern}")
|
|
|
|
if not output_base_path.is_dir():
|
|
logger.warning(f"Output base path '{output_base_path}' does not exist or is not a directory. Cannot scan for existing values.")
|
|
else:
|
|
for item in output_base_path.glob(glob_pattern):
|
|
if item.is_dir():
|
|
logger.debug(f"Checking directory: {item.name}")
|
|
num_match = extract_regex.match(item.name)
|
|
if num_match:
|
|
try:
|
|
current_val = int(num_match.group(1))
|
|
logger.debug(f"Extracted value {current_val} from {item.name}")
|
|
max_value = max(max_value, current_val)
|
|
except (ValueError, IndexError) as e:
|
|
logger.warning(f"Could not parse number from matching directory '{item.name}': {e}")
|
|
else:
|
|
logger.debug(f"Directory '{item.name}' matched glob but not extraction regex.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching for incrementing values using glob pattern '{glob_pattern}' in '{output_base_path}': {e}", exc_info=True)
|
|
# Decide on fallback behavior - returning "00" might be safer than raising
|
|
return "00" # Fallback on error during search
|
|
|
|
next_value = max_value + 1
|
|
format_string = f"{{:0{num_digits}d}}"
|
|
next_value_str = format_string.format(next_value)
|
|
logger.info(f"Determined next incrementing value: {next_value_str} (Max found: {max_value})")
|
|
return next_value_str
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
"""Removes or replaces characters invalid for filenames/directory names."""
|
|
if not isinstance(name, str): name = str(name)
|
|
name = re.sub(r'[^\w.\-]+', '_', name) # Allow alphanumeric, underscore, hyphen, dot
|
|
name = re.sub(r'_+', '_', name)
|
|
name = name.strip('_')
|
|
if not name: name = "invalid_name"
|
|
return name
|
|
|
|
# --- Basic Unit Tests ---
|
|
if __name__ == "__main__":
|
|
print("Running basic tests for path_utils.generate_path_from_pattern...")
|
|
|
|
test_pattern_1 = "[Assettype]/[supplier]/[assetname]_[resolution]_[Date]_[Time].[ext]"
|
|
test_data_1 = {
|
|
"AssetType": "Texture",
|
|
"supplier": "MegaScans",
|
|
"assetName": "RustyMetalPanel",
|
|
"Resolution": "4k",
|
|
"EXT": "png",
|
|
"Sha5": "abcde" # Included but not in pattern
|
|
}
|
|
expected_1_base = f"Texture/MegaScans/RustyMetalPanel_4k_"
|
|
try:
|
|
result_1 = generate_path_from_pattern(test_pattern_1, test_data_1)
|
|
assert result_1.startswith(expected_1_base)
|
|
assert result_1.endswith(".png")
|
|
assert len(result_1.split('_')) == 5 # Check date and time were added
|
|
print(f"PASS: Test 1 - Basic replacement: {result_1}")
|
|
except Exception as e:
|
|
print(f"FAIL: Test 1 - {e}")
|
|
|
|
test_pattern_2 = "Output/[assetname]/[assetname]_####.[ext]"
|
|
test_data_2 = {
|
|
"assetname": "WoodFloor",
|
|
"IncrementingValue": "001",
|
|
"ext": "jpg"
|
|
}
|
|
expected_2 = "Output/WoodFloor/WoodFloor_001.jpg"
|
|
try:
|
|
result_2 = generate_path_from_pattern(test_pattern_2, test_data_2)
|
|
assert result_2 == expected_2
|
|
print(f"PASS: Test 2 - IncrementingValue (####): {result_2}")
|
|
except Exception as e:
|
|
print(f"FAIL: Test 2 - {e}")
|
|
|
|
test_pattern_3 = "AppPath=[ApplicationPath]/[assetname].[ext]"
|
|
test_data_3 = {"assetname": "Test", "ext": "txt"}
|
|
expected_3_start = f"AppPath={os.path.abspath(os.getcwd())}/Test.txt"
|
|
try:
|
|
result_3 = generate_path_from_pattern(test_pattern_3, test_data_3)
|
|
assert result_3 == expected_3_start
|
|
print(f"PASS: Test 3 - ApplicationPath (default): {result_3}")
|
|
except Exception as e:
|
|
print(f"FAIL: Test 3 - {e}")
|
|
|
|
test_pattern_4 = "AppPath=[ApplicationPath]/[assetname].[ext]"
|
|
test_data_4 = {"assetname": "Test", "ext": "txt", "ApplicationPath": "/custom/path"}
|
|
expected_4 = "/custom/path/Test.txt" # Note: AppPath= part is replaced by the token logic
|
|
# Correction: The pattern includes "AppPath=", so it should remain.
|
|
expected_4_corrected = "AppPath=/custom/path/Test.txt"
|
|
try:
|
|
result_4 = generate_path_from_pattern(test_pattern_4, test_data_4)
|
|
assert result_4 == expected_4_corrected
|
|
print(f"PASS: Test 4 - ApplicationPath (override): {result_4}")
|
|
except Exception as e:
|
|
print(f"FAIL: Test 4 - {e}")
|
|
|
|
|
|
test_pattern_5 = "[assetname]/[MissingToken].[ext]"
|
|
test_data_5 = {"assetname": "FailureTest", "ext": "err"}
|
|
try:
|
|
generate_path_from_pattern(test_pattern_5, test_data_5)
|
|
print("FAIL: Test 5 - Expected ValueError for missing token")
|
|
except ValueError as e:
|
|
assert "MissingToken" in str(e)
|
|
print(f"PASS: Test 5 - Correctly raised ValueError for missing token: {e}")
|
|
except Exception as e:
|
|
print(f"FAIL: Test 5 - Incorrect exception type: {e}")
|
|
|
|
|
|
test_pattern_6 = "[assetname]/[UnknownToken].[ext]"
|
|
test_data_6 = {"assetname": "UnknownTest", "ext": "dat"}
|
|
expected_6 = "UnknownTest/[UnknownToken].dat" # Unknown tokens are left as is
|
|
try:
|
|
# Capture warnings
|
|
logging.basicConfig()
|
|
with logging.catch_warnings(record=True) as w:
|
|
result_6 = generate_path_from_pattern(test_pattern_6, test_data_6)
|
|
assert result_6 == expected_6
|
|
assert len(w) == 1
|
|
assert "Unknown token '[UnknownToken]'" in str(w[0].message)
|
|
print(f"PASS: Test 6 - Unknown token left unchanged: {result_6}")
|
|
except Exception as e:
|
|
print(f"FAIL: Test 6 - {e}")
|
|
|
|
test_pattern_7 = "[assetname]/[assetname].png" # Case check
|
|
test_data_7 = {"AssetName": "CaseTest"}
|
|
expected_7 = "CaseTest/CaseTest.png"
|
|
try:
|
|
result_7 = generate_path_from_pattern(test_pattern_7, test_data_7)
|
|
assert result_7 == expected_7
|
|
print(f"PASS: Test 7 - Case insensitivity: {result_7}")
|
|
except Exception as e:
|
|
print(f"FAIL: Test 7 - {e}")
|
|
|
|
|
|
print("Basic tests finished.") |