Token-based output support - needs testing
This commit is contained in:
46
utils/hash_utils.py
Normal file
46
utils/hash_utils.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import hashlib
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def calculate_sha256(file_path: Path) -> Optional[str]:
|
||||
"""
|
||||
Calculates the SHA-256 hash of a file.
|
||||
|
||||
Args:
|
||||
file_path: The path to the file.
|
||||
|
||||
Returns:
|
||||
The SHA-256 hash as a hexadecimal string, or None if an error occurs.
|
||||
"""
|
||||
if not isinstance(file_path, Path):
|
||||
try:
|
||||
file_path = Path(file_path)
|
||||
except TypeError:
|
||||
logger.error(f"Invalid file path type: {type(file_path)}. Expected Path object or string.")
|
||||
return None
|
||||
|
||||
if not file_path.is_file():
|
||||
logger.error(f"File not found or is not a regular file: {file_path}")
|
||||
return None
|
||||
|
||||
sha256_hash = hashlib.sha256()
|
||||
buffer_size = 65536 # Read in 64k chunks
|
||||
|
||||
try:
|
||||
with open(file_path, "rb") as f:
|
||||
while True:
|
||||
data = f.read(buffer_size)
|
||||
if not data:
|
||||
break
|
||||
sha256_hash.update(data)
|
||||
return sha256_hash.hexdigest()
|
||||
except IOError as e:
|
||||
logger.error(f"Error reading file {file_path}: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred while hashing {file_path}: {e}")
|
||||
return None
|
||||
256
utils/path_utils.py
Normal file
256
utils/path_utils.py
Normal file
@@ -0,0 +1,256 @@
|
||||
import os
|
||||
import sys
|
||||
import datetime
|
||||
import re
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def generate_path_from_pattern(pattern_string: str, token_data: dict) -> str:
|
||||
"""
|
||||
Generates a file path by replacing tokens in a pattern string with values
|
||||
from the provided token_data dictionary.
|
||||
|
||||
Args:
|
||||
pattern_string: The string containing tokens to be replaced (e.g.,
|
||||
"[Assettype]/[supplier]/[assetname]_[resolution].[ext]").
|
||||
token_data: A dictionary where keys are token names (without brackets,
|
||||
case-insensitive) and values are the replacement strings.
|
||||
Special tokens like 'IncrementingValue' or '####' should
|
||||
be provided here if used in the pattern.
|
||||
|
||||
Returns:
|
||||
The generated path string with tokens replaced.
|
||||
|
||||
Raises:
|
||||
ValueError: If a token required by the pattern (excluding date/time/apppath)
|
||||
is not found in token_data.
|
||||
KeyError: If internal logic fails to find expected date/time components.
|
||||
"""
|
||||
if not isinstance(pattern_string, str):
|
||||
raise TypeError("pattern_string must be a string")
|
||||
if not isinstance(token_data, dict):
|
||||
raise TypeError("token_data must be a dictionary")
|
||||
|
||||
# Normalize token keys in the input data for case-insensitive matching
|
||||
normalized_token_data = {k.lower(): v for k, v in token_data.items()}
|
||||
|
||||
# --- Prepare dynamic/default token values ---
|
||||
now = datetime.datetime.now()
|
||||
dynamic_tokens = {
|
||||
'date': now.strftime('%Y%m%d'),
|
||||
'time': now.strftime('%H%M%S'),
|
||||
# Provide a default ApplicationPath, can be overridden by token_data
|
||||
'applicationpath': os.path.abspath(os.getcwd())
|
||||
}
|
||||
|
||||
# Merge dynamic tokens with provided data, allowing overrides
|
||||
# Provided data takes precedence
|
||||
full_token_data = {**dynamic_tokens, **normalized_token_data}
|
||||
|
||||
# --- Define known tokens (lowercase) ---
|
||||
# Add variations like #### for IncrementingValue
|
||||
known_tokens_lc = {
|
||||
'assettype', 'supplier', 'assetname', 'resolution', 'ext',
|
||||
'incrementingvalue', '####', 'date', 'time', 'sha5', 'applicationpath'
|
||||
}
|
||||
|
||||
output_path = pattern_string
|
||||
|
||||
# --- Regex to find all tokens like [TokenName] ---
|
||||
token_pattern = re.compile(r'\[([^\]]+)\]')
|
||||
tokens_found = token_pattern.findall(pattern_string)
|
||||
|
||||
processed_tokens_lc = set()
|
||||
|
||||
for token_name in tokens_found:
|
||||
token_name_lc = token_name.lower()
|
||||
|
||||
# Handle alias #### for IncrementingValue
|
||||
lookup_key = 'incrementingvalue' if token_name_lc == '####' else token_name_lc
|
||||
|
||||
if lookup_key in processed_tokens_lc:
|
||||
continue # Already processed this token type
|
||||
|
||||
if lookup_key in full_token_data:
|
||||
replacement_value = str(full_token_data[lookup_key]) # Ensure string
|
||||
# Replace all occurrences of this token (case-insensitive original name)
|
||||
# We use a regex finditer to replace only the specific token format
|
||||
# to avoid replacing substrings within other words.
|
||||
current_token_pattern = re.compile(re.escape(f'[{token_name}]'), re.IGNORECASE)
|
||||
output_path = current_token_pattern.sub(replacement_value, output_path)
|
||||
processed_tokens_lc.add(lookup_key)
|
||||
elif lookup_key in known_tokens_lc:
|
||||
# Known token but not found in data (and not a dynamic one we generated)
|
||||
logger.warning(f"Token '[{token_name}]' found in pattern but not in token_data.")
|
||||
# Raise error for non-optional tokens if needed, or replace with placeholder
|
||||
# For now, let's raise an error to be explicit
|
||||
raise ValueError(f"Required token '[{token_name}]' not found in token_data.")
|
||||
else:
|
||||
# Token not recognized
|
||||
logger.warning(f"Unknown token '[{token_name}]' found in pattern string. Leaving it unchanged.")
|
||||
|
||||
# --- Final path cleaning (optional, e.g., normalize separators) ---
|
||||
# output_path = os.path.normpath(output_path) # Consider implications on mixed separators
|
||||
|
||||
return output_path
|
||||
def get_next_incrementing_value(output_base_path: Path, output_directory_pattern: str) -> str:
|
||||
"""Determines the next incrementing value based on existing directories."""
|
||||
# Implementation as detailed in the previous plan revision...
|
||||
logger.debug(f"Calculating next increment value for pattern '{output_directory_pattern}' in '{output_base_path}'")
|
||||
match = re.match(r"(.*?)(\[IncrementingValue\]|(#+))(.*)", output_directory_pattern)
|
||||
if not match:
|
||||
logger.warning(f"Could not find incrementing token ([IncrementingValue] or #+) in pattern '{output_directory_pattern}'. Defaulting to '00'.")
|
||||
return "00" # Default fallback if pattern doesn't contain the token
|
||||
|
||||
prefix_pattern, increment_token, suffix_pattern = match.groups()
|
||||
num_digits = len(increment_token) if increment_token.startswith("#") else 2 # Default to 2 for [IncrementingValue] if not specified otherwise
|
||||
logger.debug(f"Parsed pattern: prefix='{prefix_pattern}', token='{increment_token}' ({num_digits} digits), suffix='{suffix_pattern}'")
|
||||
|
||||
# Replace other tokens in prefix/suffix with '*' for globbing
|
||||
glob_prefix = re.sub(r'\[[^\]]+\]', '*', prefix_pattern)
|
||||
glob_suffix = re.sub(r'\[[^\]]+\]', '*', suffix_pattern)
|
||||
# Construct the glob pattern part for the number itself
|
||||
glob_increment_part = f"[{'0-9' * num_digits}]" # Matches exactly num_digits
|
||||
glob_pattern = f"{glob_prefix}{glob_increment_part}{glob_suffix}"
|
||||
logger.debug(f"Constructed glob pattern: {glob_pattern}")
|
||||
|
||||
max_value = -1
|
||||
try:
|
||||
# Prepare regex to extract the number from directory names matching the full pattern
|
||||
# Escape regex special characters in the literal parts of the pattern
|
||||
extract_prefix_re = re.escape(prefix_pattern)
|
||||
extract_suffix_re = re.escape(suffix_pattern)
|
||||
# The regex captures exactly num_digits between the escaped prefix and suffix
|
||||
extract_regex = re.compile(rf"^{extract_prefix_re}(\d{{{num_digits}}}){extract_suffix_re}.*")
|
||||
logger.debug(f"Constructed extraction regex: {extract_regex.pattern}")
|
||||
|
||||
if not output_base_path.is_dir():
|
||||
logger.warning(f"Output base path '{output_base_path}' does not exist or is not a directory. Cannot scan for existing values.")
|
||||
else:
|
||||
for item in output_base_path.glob(glob_pattern):
|
||||
if item.is_dir():
|
||||
logger.debug(f"Checking directory: {item.name}")
|
||||
num_match = extract_regex.match(item.name)
|
||||
if num_match:
|
||||
try:
|
||||
current_val = int(num_match.group(1))
|
||||
logger.debug(f"Extracted value {current_val} from {item.name}")
|
||||
max_value = max(max_value, current_val)
|
||||
except (ValueError, IndexError) as e:
|
||||
logger.warning(f"Could not parse number from matching directory '{item.name}': {e}")
|
||||
else:
|
||||
logger.debug(f"Directory '{item.name}' matched glob but not extraction regex.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error searching for incrementing values using glob pattern '{glob_pattern}' in '{output_base_path}': {e}", exc_info=True)
|
||||
# Decide on fallback behavior - returning "00" might be safer than raising
|
||||
return "00" # Fallback on error during search
|
||||
|
||||
next_value = max_value + 1
|
||||
format_string = f"{{:0{num_digits}d}}"
|
||||
next_value_str = format_string.format(next_value)
|
||||
logger.info(f"Determined next incrementing value: {next_value_str} (Max found: {max_value})")
|
||||
return next_value_str
|
||||
|
||||
# --- Basic Unit Tests ---
|
||||
if __name__ == "__main__":
|
||||
print("Running basic tests for path_utils.generate_path_from_pattern...")
|
||||
|
||||
test_pattern_1 = "[Assettype]/[supplier]/[assetname]_[resolution]_[Date]_[Time].[ext]"
|
||||
test_data_1 = {
|
||||
"AssetType": "Texture",
|
||||
"supplier": "MegaScans",
|
||||
"assetName": "RustyMetalPanel",
|
||||
"Resolution": "4k",
|
||||
"EXT": "png",
|
||||
"Sha5": "abcde" # Included but not in pattern
|
||||
}
|
||||
expected_1_base = f"Texture/MegaScans/RustyMetalPanel_4k_"
|
||||
try:
|
||||
result_1 = generate_path_from_pattern(test_pattern_1, test_data_1)
|
||||
assert result_1.startswith(expected_1_base)
|
||||
assert result_1.endswith(".png")
|
||||
assert len(result_1.split('_')) == 5 # Check date and time were added
|
||||
print(f"PASS: Test 1 - Basic replacement: {result_1}")
|
||||
except Exception as e:
|
||||
print(f"FAIL: Test 1 - {e}")
|
||||
|
||||
test_pattern_2 = "Output/[assetname]/[assetname]_####.[ext]"
|
||||
test_data_2 = {
|
||||
"assetname": "WoodFloor",
|
||||
"IncrementingValue": "001",
|
||||
"ext": "jpg"
|
||||
}
|
||||
expected_2 = "Output/WoodFloor/WoodFloor_001.jpg"
|
||||
try:
|
||||
result_2 = generate_path_from_pattern(test_pattern_2, test_data_2)
|
||||
assert result_2 == expected_2
|
||||
print(f"PASS: Test 2 - IncrementingValue (####): {result_2}")
|
||||
except Exception as e:
|
||||
print(f"FAIL: Test 2 - {e}")
|
||||
|
||||
test_pattern_3 = "AppPath=[ApplicationPath]/[assetname].[ext]"
|
||||
test_data_3 = {"assetname": "Test", "ext": "txt"}
|
||||
expected_3_start = f"AppPath={os.path.abspath(os.getcwd())}/Test.txt"
|
||||
try:
|
||||
result_3 = generate_path_from_pattern(test_pattern_3, test_data_3)
|
||||
assert result_3 == expected_3_start
|
||||
print(f"PASS: Test 3 - ApplicationPath (default): {result_3}")
|
||||
except Exception as e:
|
||||
print(f"FAIL: Test 3 - {e}")
|
||||
|
||||
test_pattern_4 = "AppPath=[ApplicationPath]/[assetname].[ext]"
|
||||
test_data_4 = {"assetname": "Test", "ext": "txt", "ApplicationPath": "/custom/path"}
|
||||
expected_4 = "/custom/path/Test.txt" # Note: AppPath= part is replaced by the token logic
|
||||
# Correction: The pattern includes "AppPath=", so it should remain.
|
||||
expected_4_corrected = "AppPath=/custom/path/Test.txt"
|
||||
try:
|
||||
result_4 = generate_path_from_pattern(test_pattern_4, test_data_4)
|
||||
assert result_4 == expected_4_corrected
|
||||
print(f"PASS: Test 4 - ApplicationPath (override): {result_4}")
|
||||
except Exception as e:
|
||||
print(f"FAIL: Test 4 - {e}")
|
||||
|
||||
|
||||
test_pattern_5 = "[assetname]/[MissingToken].[ext]"
|
||||
test_data_5 = {"assetname": "FailureTest", "ext": "err"}
|
||||
try:
|
||||
generate_path_from_pattern(test_pattern_5, test_data_5)
|
||||
print("FAIL: Test 5 - Expected ValueError for missing token")
|
||||
except ValueError as e:
|
||||
assert "MissingToken" in str(e)
|
||||
print(f"PASS: Test 5 - Correctly raised ValueError for missing token: {e}")
|
||||
except Exception as e:
|
||||
print(f"FAIL: Test 5 - Incorrect exception type: {e}")
|
||||
|
||||
|
||||
test_pattern_6 = "[assetname]/[UnknownToken].[ext]"
|
||||
test_data_6 = {"assetname": "UnknownTest", "ext": "dat"}
|
||||
expected_6 = "UnknownTest/[UnknownToken].dat" # Unknown tokens are left as is
|
||||
try:
|
||||
# Capture warnings
|
||||
logging.basicConfig()
|
||||
with logging.catch_warnings(record=True) as w:
|
||||
result_6 = generate_path_from_pattern(test_pattern_6, test_data_6)
|
||||
assert result_6 == expected_6
|
||||
assert len(w) == 1
|
||||
assert "Unknown token '[UnknownToken]'" in str(w[0].message)
|
||||
print(f"PASS: Test 6 - Unknown token left unchanged: {result_6}")
|
||||
except Exception as e:
|
||||
print(f"FAIL: Test 6 - {e}")
|
||||
|
||||
test_pattern_7 = "[assetname]/[assetname].png" # Case check
|
||||
test_data_7 = {"AssetName": "CaseTest"}
|
||||
expected_7 = "CaseTest/CaseTest.png"
|
||||
try:
|
||||
result_7 = generate_path_from_pattern(test_pattern_7, test_data_7)
|
||||
assert result_7 == expected_7
|
||||
print(f"PASS: Test 7 - Case insensitivity: {result_7}")
|
||||
except Exception as e:
|
||||
print(f"FAIL: Test 7 - {e}")
|
||||
|
||||
|
||||
print("Basic tests finished.")
|
||||
Reference in New Issue
Block a user