Asset-Frameworker/Deprecated/POC/Standalonebatcher-Main.py
2025-04-29 18:26:13 +02:00

1639 lines
66 KiB
Python

import os
import json
import time
import shutil
import zipfile
from zipfile import ZipFile
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path # Import pathlib
import cv2
import re
# Define a class for watching directories and handling file events.
class DirectoryWatcher:
# Constructor to initialize the class with required parameters.
def __init__(
self,
watched_directories,
trickle_down_callbacks,
json_path,
callback_json_file_update,
misc_passtrough
):
self.directories_to_watch = [
Path(parent["name"]) / Path(directory)
for parent in extracted_json_settings["download_sources"]
for directory in parent["process_directories_names"]
]
self.watched_directories = watched_directories
self.callbacks = trickle_down_callbacks
self.json_file_path = json_path
self.callback_json = callback_json_file_update
self.misc_passtrough = misc_passtrough
self.event_handler = None
# Method to start watching the directories and handle file events.
def watch(self):
# Create a dictionary to store the initial state of files in the watched directories.
file_dict = {}
for parent in extracted_json_settings["download_sources"]:
parent_name = Path(parent["name"])
for directory in parent["process_directories_names"]:
dir_path = parent_name / Path(directory)
# Store the initial state of files in the directory.
file_dict[dir_path] = set(os.listdir(dir_path))
self.file_dict = file_dict
# Get the modification time of the JSON file to monitor changes.
json_file_mtime = os.path.getmtime(self.json_file_path)
self.event_handler = CustomFileSystemEventHandler(self.callbacks, self.watched_directories, self.misc_passtrough)
observer = Observer()
for path in self.directories_to_watch:
# Add the event handler to each directory being watched.
observer.schedule(self.event_handler, path=path, recursive=False)
observer.start()
try:
while True:
# Sleep for a second to avoid excessive checks.
time.sleep(1)
# Check if the JSON file has been modified since the last check.
if os.path.getmtime(self.json_file_path) > json_file_mtime:
json_file_mtime = os.path.getmtime(self.json_file_path)
# Call the JSON update callback function.
self.callback_json()
except KeyboardInterrupt:
# Stop the observer in case of a keyboard interrupt (e.g., Ctrl+C).
observer.stop()
# Wait for the observer to finish and join the main thread.
observer.join()
# Define a custom FileSystemEventHandler to process file events.
class CustomFileSystemEventHandler(FileSystemEventHandler):
def __init__(self, callbacks, watched_directories, misc_passtrough):
super().__init__()
self.callbacks = callbacks
self.watched_directories = watched_directories
self.misc_passtrough = misc_passtrough
self.process_new_files = True
# Method to handle the "on_created" event when a file is created.
def on_created(self, event):
file_path = event.src_path
# Use split() method to extract the download_source directory of the path
download_source = file_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "queue_directory_path" from all matching entries
queue_directory_path = []
for entry in matching_entries:
queue_directory_path.extend(entry["queue_directory_path"])
if matching_entries:
# Extract the "queue_directory_path" from all matching entries
process_directories_names = []
for entry in matching_entries:
process_directories_names.extend(entry["process_directories_names"])
download_folder = f"{os.path.join(download_source, process_directories_names[0])}"
print(f"File created: {file_path}")
print(f"Watched_directories: {self.watched_directories}")
break_loop = False
# Trigger the corresponding callback for non-zip files based on the directory path
for list in self.watched_directories:
print(f"List: {list}")
for index, directory in enumerate(list):
print(f"Directory: {directory}")
print(f"File path: {file_path}")
print(f"Index: {index}")
# If new file is in layer 7 of trickle-down structure, then we set stop_processing_new_files false so we begin handling files again
if f'\\{directory}\\' in file_path and index == 7:
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
self.callbacks[index](file_path)
self.process_new_files = True
print(f"'process_new_files' is set to: {self.process_new_files}")
files_in_queue = get_files_in_directory(queue_directory_path[0])
print(f"Current files in queue: {files_in_queue}")
if files_in_queue:
print(f"Getting next file from queue: {files_in_queue[0]}")
# Sort the list of files by file size
files_in_queue.sort(key=lambda x: os.path.getsize(x))
file_mover(files_in_queue[0], os.path.join(download_folder, os.path.basename(files_in_queue[0])))
break_loop = True
# If new file is in layer 4 of trickle-down structure we also pass the list of files to remove from directories
elif f'\\{directory}\\' in file_path and index == 4:
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
self.callbacks[index](file_path)
if self.process_new_files:
files_in_queue = get_files_in_directory(queue_directory_path[0])
print(f"Current files in queue: {files_in_queue}")
if files_in_queue:
print(f"Getting next file from queue: {files_in_queue[0]}")
# Sort the list of files by file size
files_in_queue.sort(key=lambda x: os.path.getsize(x))
file_mover(files_in_queue[0], os.path.join(download_folder, os.path.basename(files_in_queue[0])))
break_loop = True
# If new file is in layer 3 of trickle-down structure we also pass the list of files to remove from directories
elif f'\\{directory}\\' in file_path and index == 3:
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
self.callbacks[index](file_path, self.misc_passtrough[1])
break_loop = True
# If new file is in layer 1 of trickle-down structure we also pass the list of archive directories
elif f'\\{directory}\\' in file_path and index == 1:
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
self.callbacks[index](file_path, self.misc_passtrough[0])
break_loop = True
# If new file is in layer 0 of trickle-down structure and stop_processing_new_files is false, then we set stop_processing_new_files true so we do not handle any more files before stop_processing_new_files is set false again
elif f'\\{directory}\\' in file_path and index == 0 and self.process_new_files:
if ".zip" in file_path:
self.process_new_files = False
print(f"'process_new_files' is set to: {self.process_new_files}")
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
self.callbacks[index](file_path)
break_loop = True
# Otherwise we just pass the file path
elif f'\\{directory}\\' in file_path:
print(f"'process_new_files' is set to: {self.process_new_files}")
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
if index == 0 and ".zip" in file_path:
print(f"Moving file: {file_path}\nTo queue at path: {str(os.path.join(queue_directory_path[0], os.path.basename(file_path)))}")
file_mover(file_path, os.path.join(queue_directory_path[0], os.path.basename(file_path)))
else:
self.callbacks[index](file_path)
break_loop = True
if break_loop == True:
break
if break_loop == True:
break
def zip_files(files_to_zip, zip_filename):
with ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
current_directory = os.getcwd() # Store the current working directory
for file_path in files_to_zip:
if os.path.exists(file_path):
base_name = os.path.basename(file_path)
os.chdir(os.path.dirname(file_path)) # Change CWD to the file's directory
zipf.write(base_name) # Write the file to the zip (without the directory tree)
os.chdir(current_directory) # Change back to the original CWD
else:
print(f"File not found: {file_path}")
def file_extractor(from_path, extract_to_path):
"""Function for handling unzipping of new files"""
# Making variables accessible in the scope of the function
global __settings__
# Add a short delay before further operations to allow the watchdog library to release its lock on the file.
time.sleep(__settings__[3])
# Convert extract_to_path to a string
extract_to_path_str = str(extract_to_path)
# Creation of directory name for extracted directory
new_file_name = extract_to_path_str + from_path[from_path.rindex("\\") : -4]
if __settings__[1]: # Debugging
print(f"Attempting extraction to: {new_file_name}")
else:
pass
# Attempt extracting all files from .zip file
try:
if os.path.isdir(extract_to_path_str):
with ZipFile(from_path, "r") as zipped_object:
zipped_object.extractall(new_file_name)
if __settings__[1]: # Debugging
print(f"Successfully extracted new file to {new_file_name}")
else:
pass
else:
if __settings__[2]:
if __settings__[1]: # Debugging
print(f"Attempting to create directory: {extract_to_path_str}")
else:
pass
os.mkdir(extract_to_path_str)
if __settings__[1]: # Debugging
print(f"Successfully created directory: {extract_to_path_str}")
else:
pass
with ZipFile(from_path, "r") as zipped_object:
zipped_object.extractall(new_file_name)
if __settings__[1]: # Debugging
print(f"Successfully extracted new file to {new_file_name}")
else:
pass
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed with error: {e}")
else:
pass
def file_mover(from_path, to_path):
"""Function for moving files from path to path"""
# Making variables accessible in scope of function
global __settings__
if __settings__[1]: # Debugging
print(f"Attempting to move file: {from_path}\n to: {to_path}")
else:
pass
# Add a short delay before further operations to allow the watchdog library to release its lock on the file.
time.sleep(__settings__[3])
# Attempt moving the file
try:
if os.path.exists(to_path):
if __settings__[1]: # Debugging
print(f"Duplicate found. Attempting to remove existing file or directory.")
else:
pass
try:
if os.path.isfile(to_path):
os.remove(to_path)
else:
shutil.rmtree(to_path)
if __settings__[1]: # Debugging
print(f"Successfully removed existing file or directory.")
else:
pass
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed removing existing file or directory with error: {e}")
else:
pass
# Create the destination directory if it doesn't exist
os.makedirs(os.path.dirname(to_path), exist_ok=True)
shutil.move(from_path, to_path)
current_time = time.time()
os.utime(to_path, (current_time, current_time))
if __settings__[1]: # Debugging
print(f"Successfully moved file: {from_path}\n to: {to_path}")
else:
pass
# In case of a failed attempt, handle error
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed moving file: {from_path}\n to: {to_path} \n with error: {e}")
def file_renamer(current_name, new_name):
"""Function for renaming files"""
# Making variables accessible in scope of function
global __settings__
if __settings__[1]: # Debugging
print(f"Attempting to rename file: {current_name}\n to: {new_name}")
else:
pass
# Add a short delay before further operations to allow the watchdog library to release its lock on the file.
time.sleep(__settings__[3])
# Attempt moving the file
try:
if os.path.exists(new_name):
if __settings__[1]: # Debugging
print(f"Duplicate found. Attempting to remove existing file or directory.")
else:
pass
try:
if os.path.isfile(new_name):
os.remove(new_name)
else:
shutil.rmtree(new_name)
if __settings__[1]: # Debugging
print(f"Successfully removed existing file or directory.")
else:
pass
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed to remove existing file or directory with error: {e}")
else:
pass
os.rename(current_name, new_name)
if __settings__[1]: # Debugging
print(f"Successfully renamed file: {current_name}\n to: {new_name}")
else:
pass
# In case of failed attempt, handle error
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed renamed file: {current_name}\n to: {new_name}\nwith error: {e}")
else:
pass
def get_files_in_directory(directory):
"""Function for returning all filenames of a specified directory"""
file_list = [] # List to store the file names
# Iterate over all files in the directory
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file) # Get the full path of the file
file_list.append(file_path) # Add the file name to the list
return file_list
def archetype_determiner(directory_path, json_object):
"""Function for determining the type of data downloaded and adding the appropriate prefix to the naming scheme"""
#### Minus keywords is keywords which does not fit in a naming scheme.
#### And keywords is keywords which by themselves, when no minus keyword is present, can determine an archetype.
#### Plus keywords is keywords which is welcome in the namingscheme.
# Making variables accessible in the scope of the function
global __settings__
if __settings__[1]: # Debugging
print(f"Attempting to determine archetype for file: {directory_path}")
else:
pass
# Get the file names in the directory
files = get_files_in_directory(directory_path)
# Use split() method to extract the download_source directory of the path
download_source = directory_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in json_object["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
scheme = []
for entry in matching_entries:
scheme.extend(entry["scheme"])
# Assuming there's only one dictionary in the "scheme" list
scheme_folder = list(scheme[0].keys())
print(scheme_folder)
##########################################
###### Finding Keywords in filename ######
##########################################
# Lists for keeping the found keywords. Initiated as empty.
found_minus = []
found_and = []
found_plus = []
# Iterate through the json file subfolders of the scheme folder
for subfolder in scheme_folder:
if __settings__[1]: # Debugging
print(f"Searching for keywords of scheme: {subfolder}")
else:
pass
# Iterate through all files in the directory
for file in files:
if __settings__[1]: # Debugging
print(f"Searching for keywords in file: {file}")
else:
pass
# Iterate through all keywords listed in the minus subsubfolder of the current subfolder of the scheme folder
for keyword in (scheme[0][subfolder])[0]["-"]:
# We check if the current keyword is in the filename without regarding capital case letters.
# If True, then we append it to the corresponding list initialized earlier.
if keyword.lower() in file.lower():
found_minus.append(keyword)
# Iterate through all keywords listed in the and subsubfolder of the current subfolder of the scheme folder
for keyword in (scheme[0][subfolder])[0]["&"]:
# We check if the current keyword is in the filename without regarding capital case letters.
# If True, then we append it to the corresponding list initialized earlier.
if keyword.lower() in file.lower():
found_and.append(keyword)
# Iterate through all keywords listed in the plus subsubfolder of the current subfolder of the scheme folder
for keyword in (scheme[0][subfolder])[0]["+"]:
# We check if the current keyword is in the filename without regarding capital case letters.
# If True, then we append it to the corresponding list initialized earlier.
if keyword.lower() in file.lower():
found_plus.append(keyword)
if __settings__[0]: # Debugging
print(
f"Found following keywords: \n Minus : {found_minus} \n And : {found_and} \n Plus : {found_plus}"
)
else:
pass
########################################
###### Determining file archetype ######
########################################
# Making all found keywords lowercase, while also sorting alphabetically and remove duplicates
sorted_found_minus = sorted(set([item.lower() for item in found_minus]))
sorted_found_and = sorted(set([item.lower() for item in found_and]))
sorted_found_plus = sorted(set([item.lower() for item in found_plus]))
if __settings__[0]: # Debugging
print(
f"Sorting keywords alphabetically, removing duplicates and making keywords lowercase: \n Minus : {sorted_found_minus} \n And : {sorted_found_and} \n Plus : {sorted_found_plus}"
)
else:
pass
try:
# Iterate through the json file subfolders of the scheme folder
for subfolder in scheme_folder:
if __settings__[0]: # Debugging
print(
f"Attempting to find file archetype by comparing found keywords to the naming scheme: {subfolder}"
)
else:
pass
# Making all keywords of the subfolder lowercase, while also sorting alphabetically and remove duplicates
sorted_minus = sorted(
set([item.lower() for item in (scheme[0][subfolder])[0]["-"]])
)
sorted_and = sorted(
set([item.lower() for item in (scheme[0][subfolder])[0]["&"]])
)
sorted_plus = sorted(
set([item.lower() for item in (scheme[0][subfolder])[0]["+"]])
)
# Proceed if none of the minus keywords are found
if not any(keyword.lower() in sorted_found_minus for keyword in sorted_minus):
# If any and keywords found match those of the subfolder then save the subfolder as archetype and return it to function call
if any(keyword.lower() in sorted_found_and for keyword in sorted_and):
archetype = subfolder
if __settings__[0]: # Debugging
print(f"Found archetype: {archetype}")
else:
pass
return archetype
# If all plus keywords found match those of the subfolder then save the subfolder as archetype and return it to functioncall
# We check equality without regarding order of appearance using set()
if set(sorted_plus) == set(sorted_found_plus):
archetype = subfolder
if __settings__[0]: # Debugging
print(f"Found archetype: {archetype}")
else:
pass
return archetype
except Exception as e:
if __settings__[0]: # Debugging
print(f"Failed to find file archetype with error: {e}")
else:
pass
archetype = None
return archetype
def normalize_aspect_ratio_change(original_width, original_height, resized_width, resized_height, decimals=2):
"""Function for calculating decimal change in width and height from a original image to a resized image normalised as values between 0 and 2, with 1 being no change."""
width_change_percentage = ((resized_width - original_width) / original_width) * 100
height_change_percentage = ((resized_height - original_height) / original_height) * 100
# Normalize width and height changes to be between -100 and 100
normalized_width_change = width_change_percentage / 100
normalized_height_change = height_change_percentage / 100
# Scale the normalized values to be between 0 and 2 with 1 being unchanged
normalized_width_change = min(max(normalized_width_change + 1, 0), 2)
normalized_height_change = min(max(normalized_height_change + 1, 0), 2)
closest_value_to_one = min(abs(normalized_width_change), abs(normalized_height_change))
scale_factor = 1 / closest_value_to_one if closest_value_to_one else 1
scaled_normalized_width_change = scale_factor * normalized_width_change
scaled_normalized_height_change = scale_factor * normalized_height_change
# Round the normalized values to the specified number of decimals
output_width = round(scaled_normalized_width_change, decimals)
output_height = round(scaled_normalized_height_change, decimals)
if str(output_width) == "1.0":
output_width = int(1)
if str(output_height) == "1.0":
output_height = int(1)
if original_width == original_height or output_width == output_height:
output = "EVEN"
return output, normalized_width_change, normalized_height_change
elif output_width != 1 and output_height == 1:
output = f"X{str(output_width).replace('.', '')}"
return output, normalized_width_change, normalized_height_change
elif output_height != 1 and output_width == 1:
output = f"Y{str(output_height).replace('.', '')}"
return output, normalized_width_change, normalized_height_change
else:
output = f"X{str(output_width).replace('.', '')}Y{str(output_height).replace('.', '')}"
return output, normalized_width_change, normalized_height_change
def resize_images(path, new_name_seperator, number_of_decimals):
download_source = path.split("\\", 1)[0]
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
lookup_table_greatest_axis = []
for entry in matching_entries:
lookup_table_greatest_axis.extend(entry["lookup_table_greatest_axis"])
for filename in os.listdir(path):
file_name_parts = split_filename(os.path.basename(filename), new_name_seperator)
file_type_part = split_filename(filename, '.')
if filename.endswith('.jpg') or filename.endswith('.png') or filename.endswith('.tif'):
image_path = os.path.join(path, filename)
img = cv2.imread(image_path)
original_height, original_width, _ = img.shape
if original_width > int((lookup_table_greatest_axis[0])[0]) and original_height > int((lookup_table_greatest_axis[0])[0]):
img = cv2.resize(img, (int((lookup_table_greatest_axis[0])[0]), int((lookup_table_greatest_axis[0])[0])))
original_resized_width = 2 ** (original_width - 1).bit_length()
original_resized_height = 2 ** (original_height - 1).bit_length()
try:
if __settings__[0]: # Debugging
print(f"Attempting to resize image to 'width x height': {original_resized_width} x {original_resized_height}")
else:
pass
img = cv2.resize(img, (original_resized_width, original_resized_height))
if __settings__[0]: # Debugging
print(f"Resized image to 'width x height': {original_resized_width} x {original_resized_height}")
else:
pass
except Exception as e:
if __settings__[0]: # Debugging
print(f"Failed resizing image to 'width x height': {original_resized_width} x {original_resized_height}\nwith error: {e}")
else:
pass
for element in lookup_table_greatest_axis:
if original_resized_width == element[0] or original_resized_height == element[0]:
resolution = element[1]
break
scaling_value, width_change, height_change = normalize_aspect_ratio_change(original_width, original_height, original_resized_width, original_resized_height, number_of_decimals)
try:
if __settings__[0]: # Debugging
print(f"Attempting to save resized image with 'width x height': {original_resized_width} x {original_resized_height}\nAnd aspect ratio: {width_change}x{height_change}\nFormatted as: {scaling_value}")
else:
pass
temp_path = os.path.join(path, "temp.jpg") # Temporary file path
cv2.imwrite(temp_path, img, [cv2.IMWRITE_JPEG_QUALITY, 98]) # Save with temporary high quality
img_temp = cv2.imread(temp_path)
cv2.imwrite(
os.path.join(path, f"{file_name_parts[0]}{new_name_seperator}{file_name_parts[1]}{new_name_seperator}{resolution}{new_name_seperator}{scaling_value}{new_name_seperator}{file_name_parts[2]}.{file_type_part[-1]}"), img_temp
) # Save with desired quality
os.unlink(temp_path) # Remove temporary file
if __settings__[0]: # Debugging
print(f"Saved resized image with 'width x height': {original_resized_width} x {original_resized_height}\nAnd aspect ratio: {width_change}x{height_change}\nFormatted as: {scaling_value}")
else:
pass
except Exception as e:
if __settings__[0]: # Debugging
print(f"Failed to save resized image with 'width x height': {original_resized_width} x {original_resized_height}\nAnd aspect ratio: {width_change}x{height_change}\nFormatted as: {scaling_value}\nWith error: {e}")
else:
pass
downscale_resized_width = original_resized_width
downscale_resized_height = original_resized_height
upscale_resized_width = original_resized_width
upscale_resized_height = original_resized_height
if original_resized_width >= (lookup_table_greatest_axis[-1])[0] or original_resized_height >= (lookup_table_greatest_axis[-1])[0]:
for element in reversed(lookup_table_greatest_axis):
if original_resized_width == element[0] or original_resized_height == element[0]:
resolution = element[1]
break
while resolution is not (lookup_table_greatest_axis[-1])[1]:
img_copy = img.copy()
downscale_resized_width /= 2
downscale_resized_height /= 2
downscaled_img = cv2.resize(
img_copy, (int(downscale_resized_width), int(downscale_resized_height)))
for element in lookup_table_greatest_axis:
if downscale_resized_width == element[0] or downscale_resized_height == element[0]:
resolution = element[1]
break
temp_path = os.path.join(path, "temp.jpg") # Temporary file path
cv2.imwrite(temp_path, downscaled_img, [cv2.IMWRITE_JPEG_QUALITY, 98]) # Save with temporary high quality
downscaled_img_temp = cv2.imread(temp_path)
cv2.imwrite(
os.path.join(path, f"{file_name_parts[0]}{new_name_seperator}{file_name_parts[1]}{new_name_seperator}{resolution}{new_name_seperator}{scaling_value}{new_name_seperator}{file_name_parts[2]}.{file_type_part[-1]}"), downscaled_img_temp
) # Save with desired quality
os.unlink(temp_path) # Remove temporary file
if __settings__[0]: # Debugging
print(f"Saved downscaled image with resolution '{resolution}' and dimensions 'width x height': {downscale_resized_width} x {downscale_resized_height}")
else:
pass
else:
for element in reversed(lookup_table_greatest_axis):
if original_resized_width == element[0] or original_resized_height == element[0]:
resolution = element[1]
break
while resolution is not (lookup_table_greatest_axis[-1])[1]:
img_copy = img.copy()
downscale_resized_width /= 2
downscale_resized_height /= 2
downscaled_img = cv2.resize(
img_copy, (int(downscale_resized_width), int(downscale_resized_height)))
for element in lookup_table_greatest_axis:
if downscale_resized_width == element[0] or downscale_resized_height == element[0]:
resolution = element[1]
break
temp_path = os.path.join(path, "temp.jpg") # Temporary file path
cv2.imwrite(temp_path, downscaled_img, [cv2.IMWRITE_JPEG_QUALITY, 98]) # Save with temporary high quality
downscaled_img_temp = cv2.imread(temp_path)
cv2.imwrite(
os.path.join(path, f"{file_name_parts[0]}{new_name_seperator}{file_name_parts[1]}{new_name_seperator}{resolution}{new_name_seperator}{scaling_value}{new_name_seperator}{file_name_parts[2]}.{file_type_part[-1]}"), downscaled_img_temp
) # Save with desired quality
os.unlink(temp_path) # Remove temporary file
if __settings__[0]: # Debugging
print(f"Saved downscaled image with resolution '{resolution}' and dimensions 'width x height': {downscale_resized_width} x {downscale_resized_height}")
else:
pass
time.sleep(1)
os.unlink(image_path)
def split_filename(filename, separator):
"""Function used to get a list of all the parts that make up the file name"""
parts = filename.split(separator)
return parts
def move_to_trickle_down_layer(file_path, from_layer, to_layer, directory_list_to_search_in):
"""Function used when moving files to the next layer of the trickle-down structure"""
# Making variables accessible in scope of function
global __settings__
try:
if __settings__[1]: # Debugging
print(f"Attempting to move file from trickle-down layer {str(from_layer)} to trickle-down layer {str(to_layer)}: {file_path}")
else:
pass
# Use split() method to extract the parent directory of the path
parent = file_path.split("\\", 1)[0]
if __settings__[1]: # Debugging
print(f"Detected that file is within parent directory: {parent}")
else:
pass
layer_string = f'{parent}\\{str(to_layer)}_'
if __settings__[1]: # Debugging
print(f"Searching for string '{layer_string}' in following directories: {directory_list_to_search_in}")
else:
pass
# Variable for controlling breaking out of loop early
break_loop = False
for folder, folders_list in zip(download_sources, directory_list_to_search_in):
for directory in folders_list:
full_path = os.path.join(folder["name"], directory)
if __settings__[1]: # Debugging
print(f"Checking if {layer_string} is in: {full_path}")
else:
pass
if layer_string in full_path:
target_path = f"{full_path}\\{os.path.basename(file_path)}"
file_mover(file_path, target_path)
break_loop = True
if break_loop == True:
break
if break_loop == True:
break
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed to move file from trickle-down layer {str(from_layer)} to trickle-down layer {str(to_layer)}: {file_path}\nWith error: {e}")
else:
pass
def extract_to_trickle_down_layer(file_path, from_layer, to_layer, directory_list_to_search_in):
"""Function used when extracting files to the next layer of the trickle-down structure"""
# Making variables accessible in scope of function
global __settings__
if '.zip' in file_path:
if __settings__[1]: # Debugging
print(f"Attempting to extract file from trickle-down layer {str(from_layer)} to trickle-down layer {str(to_layer)}: {file_path}")
else:
pass
# Use split() method to extract the parent directory of the path
parent = file_path.split("\\", 1)[0]
if __settings__[1]: # Debugging
print(f"Detected that file is within parent directory: {parent}")
else:
pass
break_loop = False
for folder, folders_list in zip(download_sources, directory_list_to_search_in):
for directory in folders_list:
full_path = os.path.join(folder["name"], directory)
layer_string = f'{parent}\\{str(to_layer)}_'
if __settings__[1]: # Debugging
print(f"Checking if {layer_string} is in: {full_path}")
else:
pass
if layer_string in full_path:
target_directory = full_path
file_extractor(file_path, target_directory)
break_loop = True
if break_loop == True:
break
if break_loop == True:
break
else:
if __settings__[1]: # Debugging
print(f"Error extracting file from trickle-down layer {str(from_layer)} to trickle-down layer {str(to_layer)}. File is not a .zip file: {file_path}")
else:
pass
def file_added_callback(file_path):
"""
Function called by the watcher class whenever a new file is added to a watched directory.
This function moves all .zip files to the first layer of the trickle-down structure.
"""
# Making variables accessible in scope of function
global __settings__
global process_directory_list
global download_sources
file_path = str(file_path) # Convert file_path to a string
if __settings__[1]: # Debugging
print("New file detected in trickle-down layer 0:", file_path)
else:
pass
# If the downloaded file has '.zip' in the name, then we move it to the first layer of the trickle-down structure. Else we pass.
if ".zip" in file_path:
move_to_trickle_down_layer(file_path, from_layer=0, to_layer=1, directory_list_to_search_in=process_directory_list)
else:
pass
def callback_trickleDownLayer_1(file_path, archive_directories):
"""Function called when a new file is detected in the first layer of the trickle-down processing structure."""
# Making variables accessible in scope of function
global __settings__
global process_directory_list
if __settings__[1]: # Debugging
print(f"File arrived in trickle-down layer 1: {file_path}")
else:
pass
# Use split() method to extract the download_source directory of the path
download_source = file_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
zip_archive_directory_path = []
for entry in matching_entries:
zip_archive_directory_path.extend(entry["zip_archive_directory_path"])
break_loop = False
# Iterate trough the lists of directories we have specified to watch
for list in process_directory_list:
# Iterate trough the directories in the list
for directory in list:
# We check which watched directory the file is in
if directory in file_path:
# We unzip the file to second layer of the trickle-down struckture
extract_to_trickle_down_layer(file_path, from_layer=1, to_layer=2, directory_list_to_search_in=process_directory_list)
# We move the sourcefile to the archive directory
file_mover(file_path , os.path.join(zip_archive_directory_path[0], os.path.basename(file_path)))
break_loop = True
if break_loop:
break
if break_loop:
break
def callback_trickleDownLayer_2(file_path):
"""Function called when a new file is detected in the second layer of the trickle-down processing structure"""
if __settings__[1]: # Debugging
print(f"File arrived in trickle-down layer 2: {file_path}")
else:
pass
files_in_directory = get_files_in_directory(file_path)
if __settings__[1]: # Debugging
print(f"Checking if more than one set of files are present in file: {file_path}")
else:
pass
# Use split() method to extract the download_source directory of the path
download_source = file_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
filetype_indicating_a_set_of_files_list = []
for entry in matching_entries:
filetype_indicating_a_set_of_files_list.extend(entry["filetype_indicating_a_set_of_files"])
# Variable holding any files with filetypes that are both listed in the json file and found in the directory
files_found = []
for file in files_in_directory:
for filetype in filetype_indicating_a_set_of_files_list:
if filetype in file:
files_found.append(file)
if len(files_found) > 1:
if __settings__[1]: # Debugging
print(f"There are {len(files_found)} sets of files in file: {file_path}")
else:
pass
basenames = []
for file in files_found:
basename = (os.path.basename(file))[:-int(len(filetype_indicating_a_set_of_files_list[0]))]
basenames.append(basename)
if __settings__[1]: # Debugging
print(f"Found sets {basenames} in file: {file_path}")
else:
pass
new_directories = []
sets = []
for name in basenames:
current_file_directory_path = os.path.dirname(file_path)
new_file_directory_path = os.path.join(file_path, name)
new_directories.append(new_file_directory_path)
files_in_set = []
for file in files_in_directory:
if name in os.path.basename(file):
files_in_set.append(file)
sets.append(files_in_set)
for directory in new_directories:
os.mkdir(directory)
for i in range(len(sets)):
for file in sets[i]:
shutil.move(file, os.path.join(os.path.join(os.path.dirname(file), basenames[i]), os.path.basename(file)))
for directory in new_directories:
# We move the file to second layer of the trickle-down struckture
move_to_trickle_down_layer(directory, from_layer=2, to_layer=3, directory_list_to_search_in=process_directory_list)
shutil.rmtree(file_path)
else:
move_to_trickle_down_layer(file_path, from_layer=2, to_layer=3, directory_list_to_search_in=process_directory_list)
def callback_trickleDownLayer_3(file_path, file_to_remove_list_of_lists):
"""Function called when a new file is detected in the third layer of the trickle-down processing structure"""
if __settings__[1]: # Debugging
print(f"File arrived in trickle-down layer 3: {file_path}")
else:
pass
files_in_directory = get_files_in_directory(file_path)
# Use split() method to extract the download_source directory of the path
download_source = file_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
files_to_remove_from_directories_list = []
for entry in matching_entries:
files_to_remove_from_directories_list.extend(entry["files_to_remove_from_directories"])
# Extract the "minimum_image_size" from all matching entries
minimum_image_size = []
for entry in matching_entries:
minimum_image_size.extend(entry["minimum_image_size"])
try:
if __settings__[1]: # Debugging
print(f"Attempting to remove files: {file_to_remove_list_of_lists}\n from file: {file_path}")
else:
pass
# Iterate through the files in the directory
for file in files_in_directory:
for file_to_remove in files_to_remove_from_directories_list:
if file_to_remove.lower() in file.lower():
if __settings__[1]: # Debugging
print(f"Removing file: {file}")
else:
pass
# Remove the file
os.remove(file)
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed removing files from {file_path} with error {e}")
else:
pass
files_in_directory = get_files_in_directory(file_path)
try:
if __settings__[1]: # Debugging
print(f"Attempting to remove files with dimensions less than: {minimum_image_size[0]}x{minimum_image_size[1]}\nFrom file: {file_path}")
else:
pass
# Iterate through the files in the directory
for file in files_in_directory:
img = cv2.imread(file, 1)
if img is not None:
height, width, channels = img.shape
if height < minimum_image_size[0] and width < minimum_image_size[1]:
if __settings__[1]: # Debugging
print(f"Removing file: {file}")
else:
pass
# Remove the file
os.remove(file)
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed removing files from {file_path} with error {e}")
else:
pass
else:
if __settings__[1]: # Debugging
print(f"ERROR: No matching download_source directory found in JSON data for '{file_path}'.")
else:
pass
move_to_trickle_down_layer(file_path, from_layer=3, to_layer=4, directory_list_to_search_in=process_directory_list)
def callback_trickleDownLayer_4(file_path):
"""Function called when a new file is detected in the fourth layer of the trickle-down processing structure"""
global watcher
if __settings__[1]: # Debugging
print(f"File arrived in trickle-down layer 4: {file_path}")
else:
pass
# We determine what prefix the downloaded file shall get
file_archetype = archetype_determiner(file_path, extracted_json_settings)
# Use split() method to extract the download_source directory of the path
download_source = file_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
new_name_seperator = []
for entry in matching_entries:
new_name_seperator.extend(entry["new_name_seperator"])
if matching_entries:
# Extract the "filetype_indicating_a_set_of_files" from all matching entries
filetype_indicating_a_set_of_files = []
for entry in matching_entries:
filetype_indicating_a_set_of_files.extend(entry["filetype_indicating_a_set_of_files"])
if matching_entries:
# Extract the "model_output_path" from all matching entries
model_output_path = []
for entry in matching_entries:
model_output_path.extend(entry["model_output_path"])
new_dir_name = os.path.join(os.path.dirname(file_path), f"{file_archetype}{new_name_seperator[0]}{os.path.basename(file_path)}")
time.sleep(1)
try:
if __settings__[1]: # Debugging
print(f"Attempting renaiming file: {file_path}")
print(f" to: {new_dir_name}")
else: pass
file_renamer(file_path, new_dir_name)
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed renaming file: {file_path}")
print(f" to: {new_dir_name}")
print(f" with error: {e}")
else: pass
files = get_files_in_directory(new_dir_name)
for file in files:
for file_type in filetype_indicating_a_set_of_files:
if file_type in file:
new_fbx_file_name = f"{file_archetype}{new_name_seperator[0]}{os.path.basename(file)}"
file_renamer(file, os.path.join(new_dir_name, new_fbx_file_name))
file_mover(os.path.join(new_dir_name, new_fbx_file_name), os.path.join(model_output_path[0], new_fbx_file_name))
if "None" in os.path.basename(new_dir_name):
if __settings__[1]: # Debugging
print(f"Failed finding file archetype. Archetype set to 'None'. Abandoning files.")
else:
pass
watcher.event_handler.process_new_files = True
if __settings__[1]: # Debugging
print(f"'process_new_files' is set to: {watcher.event_handler.process_new_files}")
else:
pass
else:
move_to_trickle_down_layer(new_dir_name, from_layer=4, to_layer=5, directory_list_to_search_in=process_directory_list)
def callback_trickleDownLayer_5(file_path):
"""Function called when a new file is detected in the sixth layer of the trickle-down processing structure"""
if __settings__[1]: # Debugging
print(f"File arrived in trickle-down layer 5: {file_path}")
else:
pass
# Use split() method to extract the download_source directory of the path
download_source = file_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "scheme" from all matching entries
scheme = []
for entry in matching_entries:
scheme.extend(entry["scheme"])
if matching_entries:
# Extract the "new_name_seperator" from all matching entries
new_name_seperator = []
for entry in matching_entries:
new_name_seperator.extend(entry["new_name_seperator"])
if matching_entries:
# Extract the "original_name_seperator" from all matching entries
original_name_seperator = []
for entry in matching_entries:
original_name_seperator.extend(entry["original_name_seperator"])
if matching_entries:
# Extract the "original_name_comes_before_seperator_number" from all matching entries
original_name_index = []
for entry in matching_entries:
original_name_index.extend(entry["original_name_comes_before_seperator_number"])
if matching_entries:
# Extract the "maptype_comes_before_seperator_number" from all matching entries
maptype_index = []
for entry in matching_entries:
maptype_index.extend(entry["maptype_comes_before_seperator_number"])
if matching_entries:
# Extract the "maptype_comes_before_seperator_number" from all matching entries
maptype_keywords_ending_with_standardized_maptype_list_of_lists = []
for entry in matching_entries:
maptype_keywords_ending_with_standardized_maptype_list_of_lists.extend(entry["maptype_keywords_ending_with_standardized_maptype"])
# Assuming there's only one dictionary in the "scheme" list
scheme_folder = list(scheme[0].keys())
files = get_files_in_directory(file_path)
file_archetype = "None"
# Get archetype from directory name and apply it to the name of all files within the directory
for archetype in scheme_folder:
if f"{archetype}{new_name_seperator[0]}" in file_path:
file_archetype = archetype
# Get all parts of file name and rename the file with the correct parts in the correct order
for file in files:
file_name_parts = split_filename(os.path.basename(file), original_name_seperator[0])
file_type_part = split_filename(file, '.')
if "col_var" in file.lower():
match = re.search(r'col_var(\d+)', file.lower())
if match:
variant_value = int(match.group(1))
new_file_name = os.path.join(os.path.dirname(file), f"{file_archetype}{new_name_seperator[0]}{file_name_parts[original_name_index[0]]}{new_name_seperator[0]}COL-{variant_value}{new_name_seperator[0]}.{file_type_part[-1]}")
else:
pass
elif "COL" in file or "col" in file:
new_file_name = os.path.join(os.path.dirname(file), f"{file_archetype}{new_name_seperator[0]}{file_name_parts[original_name_index[0]]}{new_name_seperator[0]}COL-1{new_name_seperator[0]}.{file_type_part[-1]}")
else:
maptype = file_name_parts[maptype_index[0]]
for a_list in maptype_keywords_ending_with_standardized_maptype_list_of_lists:
if maptype.lower() in list(map(str.lower, a_list)):
standardised_maptype = a_list[-1]
break
else:
standardised_maptype = f"{maptype}-UNRECOGNIZED"
new_file_name = os.path.join(os.path.dirname(file), f"{file_archetype}{new_name_seperator[0]}{file_name_parts[original_name_index[0]]}{new_name_seperator[0]}{standardised_maptype}{new_name_seperator[0]}.{file_type_part[-1]}")
file_renamer(file, new_file_name)
move_to_trickle_down_layer(file_path, from_layer=5, to_layer=6, directory_list_to_search_in=process_directory_list)
def callback_trickleDownLayer_6(file_path):
"""Function called when a new file is detected in the fifth layer of the trickle-down processing structure"""
if __settings__[1]: # Debugging
print(f"File arrived in trickle-down layer 6: {file_path}")
else:
pass
# Use split() method to extract the download_source directory of the path
download_source = file_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
new_name_seperator = []
for entry in matching_entries:
new_name_seperator.extend(entry["new_name_seperator"])
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
aspect_ratio_number_of_decimals = []
for entry in matching_entries:
aspect_ratio_number_of_decimals.extend(entry["aspect_ratio_number_of_decimals"])
resize_images(file_path, new_name_seperator[0], int(aspect_ratio_number_of_decimals[0]))
move_to_trickle_down_layer(file_path, from_layer=6, to_layer=7, directory_list_to_search_in=process_directory_list)
def callback_trickleDownLayer_7(file_path):
"""Function called when a new file is detected in the seventh layer of the trickle-down processing structure"""
if __settings__[1]: # Debugging
print(f"File arrived in trickle-down layer 7: {file_path}")
else:
pass
# Use split() method to extract the download_source directory of the path
download_source = file_path.split("\\", 1)[0]
# Find the matching entry for the current download_source directory in the JSON data
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
output_paths = []
for entry in matching_entries:
output_paths.extend(entry["output_paths"])
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
scheme = []
for entry in matching_entries:
scheme.extend(entry["scheme"])
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
new_name_seperator = []
for entry in matching_entries:
new_name_seperator.extend(entry["new_name_seperator"])
if matching_entries:
# Extract the "files_to_remove_from_directories" from all matching entries
unrecognized_maptype = []
for entry in matching_entries:
unrecognized_maptype.extend(entry["unrecognized_maptype_output_path"])
scheme_folder = list(scheme[0].keys())
files = get_files_in_directory(file_path)
for file in files:
if "UNRECOGNIZED" in file:
file_mover(file, os.path.join(unrecognized_maptype[0], os.path.basename(file)))
files = get_files_in_directory(file_path)
for file in files:
for index, archetype in enumerate(scheme_folder):
if f"{archetype}{new_name_seperator[0]}" in file:
file_mover(file, os.path.join(output_paths[index], os.path.basename(file)))
if not os.listdir(file_path):
os.rmdir(file_path)
def load_settings_json():
"""Function for loading the settings.json file"""
# Making variables accessible in scope of function
global extracted_json_settings
global __settings__
if __settings__[1]: # Debugging
print(f"Attempting to load {__settings__[0]}.")
else:
pass
# We attempt to load settings.json. If the file is using wrong syntax, then this try except statement will notify the user without terminating the program
try:
# Open settings.json file in read mode and call it 'json_file'
with open(__settings__[0], "r") as json_file:
# Read all lines disregarding any whitespace (spaces and newlines)
lines = [line.rstrip() for line in json_file]
# Join the lines into a single string
json_string = "".join(lines)
# Parse the JSON string into a Python object
extracted_json_settings = json.loads(json_string)
except Exception as e:
if __settings__[1]: # Debugging
print(f"Failed to load {__settings__[0]} with error: {e}")
print("Retrying in 30 seconds")
time.sleep(30)
load_settings_json()
def create_directories_from_json_file(directory_list):
"""Function for handling creation of directories listed in .json file"""
# Making variables accessible in scope of function
global __settings__
# If the system shall create missing directories, then we check if any of the directories from settings.json does not exist, and if so we create it
if __settings__[2]:
for folder, folders_list in zip(download_sources, directory_list):
for directory in folders_list:
if ".\\" in directory:
full_path = directory
else:
full_path = os.path.join(folder["name"], directory)
if not os.path.isdir(full_path):
try:
if __settings__[1]: # Debugging
print(f"Attempting to create directory: {full_path}")
else:
pass
os.makedirs(full_path)
if __settings__[1]: # Debugging
print(f"Successfully created directory: {full_path}")
else:
pass
except Exception as e:
if __settings__[1]: # Debugging
print(
f"Failed to create directory {full_path} with error: {e}"
)
else:
pass
def init():
"""Function for initializing the watcher. This is also the callback function for when settings.json is updated, so that the watcher is always up to date on its settings."""
global watcher
# Making variables accessible in the scope of the function
global directory_list
global extracted_json_settings
global process_directory_list
global download_sources
# Load the settings.json file
load_settings_json()
# Extract all download folders from the .json file
download_sources = extracted_json_settings["download_sources"]
# Extract the "process_directories_names" for each item in "download_sources"
process_directory_list = [
folder["process_directories_names"]
for folder in extracted_json_settings["download_sources"]
]
# Extract the "zip_archive_directory_path" for each item in "download_sources"
archive_directory_list = [
folder["zip_archive_directory_path"]
for folder in extracted_json_settings["download_sources"]
]
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
files_to_remove_from_directories_list = [
folder["files_to_remove_from_directories"]
for folder in extracted_json_settings["download_sources"]
]
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
unrecognized_maptype_output_path = [
folder["unrecognized_maptype_output_path"]
for folder in extracted_json_settings["download_sources"]
]
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
fbx_output_path = [
folder["model_output_path"]
for folder in extracted_json_settings["download_sources"]
]
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
output_paths = [
folder["output_paths"]
for folder in extracted_json_settings["download_sources"]
]
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
queue_directory_path = [
folder["queue_directory_path"]
for folder in extracted_json_settings["download_sources"]
]
# Create directories listed in "process_directories_names" tags in the json file
create_directories_from_json_file(process_directory_list)
# Create directories listed in "zip_archive_directory_path" tags in the json file
create_directories_from_json_file(archive_directory_list)
# Create directories listed in "unrecognized_maptype_output_path" tags in the json file
create_directories_from_json_file(unrecognized_maptype_output_path)
# Create directories listed in "fbx_output_path" tags in the json file
create_directories_from_json_file(fbx_output_path)
# Create directories listed in "output_paths" tags in the json file
create_directories_from_json_file(output_paths)
# Create directories listed in "queue_directory_path" tags in the json file
create_directories_from_json_file(queue_directory_path)
process_directory_list_callback_functions = [
file_added_callback,
callback_trickleDownLayer_1,
callback_trickleDownLayer_2,
callback_trickleDownLayer_3,
callback_trickleDownLayer_4,
callback_trickleDownLayer_5,
callback_trickleDownLayer_6,
callback_trickleDownLayer_7
]
misc_passtrough = [archive_directory_list, files_to_remove_from_directories_list]
# Create an instance of DirectoryWatcher
watcher = DirectoryWatcher(
process_directory_list,
process_directory_list_callback_functions,
__settings__[0],
init,
misc_passtrough
)
if __settings__[1]: # Debugging
print(f"Watching for new files in directories: {process_directory_list}")
else:
pass
# Start watching the directories
watcher.watch()
if __name__ == "__main__":
####################################
########## SETTINGS ##########
####################################
## Do touch ##
json_file_name = (
"settings.json" # json file name contaning settings of the whatched directories
)
debug_prints = True # Bool for activating debug print statements
create_missing_folders = True # Bool for determining whether or not the system shall create missing folders.
delay = 0.5 # Seconds
## Do not touch ##
__settings__ = [json_file_name, debug_prints, create_missing_folders, delay]
####################################
########## Main ##########
####################################
# Init variables
download_sources = []
process_directory_list = []
archive_directory_list = []
extracted_json_settings = []
watcher = None
# Run initialization
init()