1639 lines
66 KiB
Python
1639 lines
66 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import shutil
|
|
import zipfile
|
|
from zipfile import ZipFile
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
from pathlib import Path # Import pathlib
|
|
import cv2
|
|
import re
|
|
|
|
|
|
# Define a class for watching directories and handling file events.
|
|
class DirectoryWatcher:
|
|
|
|
# Constructor to initialize the class with required parameters.
|
|
def __init__(
|
|
self,
|
|
watched_directories,
|
|
trickle_down_callbacks,
|
|
json_path,
|
|
callback_json_file_update,
|
|
misc_passtrough
|
|
|
|
):
|
|
self.directories_to_watch = [
|
|
Path(parent["name"]) / Path(directory)
|
|
for parent in extracted_json_settings["download_sources"]
|
|
for directory in parent["process_directories_names"]
|
|
]
|
|
|
|
self.watched_directories = watched_directories
|
|
self.callbacks = trickle_down_callbacks
|
|
self.json_file_path = json_path
|
|
self.callback_json = callback_json_file_update
|
|
self.misc_passtrough = misc_passtrough
|
|
self.event_handler = None
|
|
|
|
# Method to start watching the directories and handle file events.
|
|
def watch(self):
|
|
|
|
# Create a dictionary to store the initial state of files in the watched directories.
|
|
file_dict = {}
|
|
for parent in extracted_json_settings["download_sources"]:
|
|
parent_name = Path(parent["name"])
|
|
for directory in parent["process_directories_names"]:
|
|
dir_path = parent_name / Path(directory)
|
|
# Store the initial state of files in the directory.
|
|
file_dict[dir_path] = set(os.listdir(dir_path))
|
|
self.file_dict = file_dict
|
|
|
|
# Get the modification time of the JSON file to monitor changes.
|
|
json_file_mtime = os.path.getmtime(self.json_file_path)
|
|
self.event_handler = CustomFileSystemEventHandler(self.callbacks, self.watched_directories, self.misc_passtrough)
|
|
observer = Observer()
|
|
for path in self.directories_to_watch:
|
|
# Add the event handler to each directory being watched.
|
|
observer.schedule(self.event_handler, path=path, recursive=False)
|
|
observer.start()
|
|
|
|
try:
|
|
while True:
|
|
# Sleep for a second to avoid excessive checks.
|
|
time.sleep(1)
|
|
# Check if the JSON file has been modified since the last check.
|
|
if os.path.getmtime(self.json_file_path) > json_file_mtime:
|
|
json_file_mtime = os.path.getmtime(self.json_file_path)
|
|
# Call the JSON update callback function.
|
|
self.callback_json()
|
|
except KeyboardInterrupt:
|
|
# Stop the observer in case of a keyboard interrupt (e.g., Ctrl+C).
|
|
observer.stop()
|
|
# Wait for the observer to finish and join the main thread.
|
|
observer.join()
|
|
|
|
# Define a custom FileSystemEventHandler to process file events.
|
|
class CustomFileSystemEventHandler(FileSystemEventHandler):
|
|
def __init__(self, callbacks, watched_directories, misc_passtrough):
|
|
super().__init__()
|
|
self.callbacks = callbacks
|
|
self.watched_directories = watched_directories
|
|
self.misc_passtrough = misc_passtrough
|
|
self.process_new_files = True
|
|
|
|
|
|
# Method to handle the "on_created" event when a file is created.
|
|
def on_created(self, event):
|
|
|
|
file_path = event.src_path
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = file_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "queue_directory_path" from all matching entries
|
|
queue_directory_path = []
|
|
for entry in matching_entries:
|
|
queue_directory_path.extend(entry["queue_directory_path"])
|
|
|
|
if matching_entries:
|
|
# Extract the "queue_directory_path" from all matching entries
|
|
process_directories_names = []
|
|
for entry in matching_entries:
|
|
process_directories_names.extend(entry["process_directories_names"])
|
|
|
|
download_folder = f"{os.path.join(download_source, process_directories_names[0])}"
|
|
|
|
print(f"File created: {file_path}")
|
|
|
|
print(f"Watched_directories: {self.watched_directories}")
|
|
|
|
break_loop = False
|
|
|
|
# Trigger the corresponding callback for non-zip files based on the directory path
|
|
for list in self.watched_directories:
|
|
print(f"List: {list}")
|
|
|
|
for index, directory in enumerate(list):
|
|
print(f"Directory: {directory}")
|
|
print(f"File path: {file_path}")
|
|
print(f"Index: {index}")
|
|
|
|
# If new file is in layer 7 of trickle-down structure, then we set stop_processing_new_files false so we begin handling files again
|
|
if f'\\{directory}\\' in file_path and index == 7:
|
|
|
|
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
|
|
|
|
self.callbacks[index](file_path)
|
|
|
|
self.process_new_files = True
|
|
|
|
print(f"'process_new_files' is set to: {self.process_new_files}")
|
|
|
|
files_in_queue = get_files_in_directory(queue_directory_path[0])
|
|
|
|
print(f"Current files in queue: {files_in_queue}")
|
|
|
|
if files_in_queue:
|
|
print(f"Getting next file from queue: {files_in_queue[0]}")
|
|
|
|
# Sort the list of files by file size
|
|
files_in_queue.sort(key=lambda x: os.path.getsize(x))
|
|
|
|
file_mover(files_in_queue[0], os.path.join(download_folder, os.path.basename(files_in_queue[0])))
|
|
|
|
|
|
break_loop = True
|
|
|
|
|
|
# If new file is in layer 4 of trickle-down structure we also pass the list of files to remove from directories
|
|
elif f'\\{directory}\\' in file_path and index == 4:
|
|
|
|
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
|
|
|
|
self.callbacks[index](file_path)
|
|
|
|
if self.process_new_files:
|
|
|
|
files_in_queue = get_files_in_directory(queue_directory_path[0])
|
|
|
|
print(f"Current files in queue: {files_in_queue}")
|
|
|
|
if files_in_queue:
|
|
print(f"Getting next file from queue: {files_in_queue[0]}")
|
|
|
|
# Sort the list of files by file size
|
|
files_in_queue.sort(key=lambda x: os.path.getsize(x))
|
|
|
|
file_mover(files_in_queue[0], os.path.join(download_folder, os.path.basename(files_in_queue[0])))
|
|
|
|
break_loop = True
|
|
|
|
|
|
# If new file is in layer 3 of trickle-down structure we also pass the list of files to remove from directories
|
|
elif f'\\{directory}\\' in file_path and index == 3:
|
|
|
|
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
|
|
|
|
self.callbacks[index](file_path, self.misc_passtrough[1])
|
|
|
|
break_loop = True
|
|
|
|
# If new file is in layer 1 of trickle-down structure we also pass the list of archive directories
|
|
elif f'\\{directory}\\' in file_path and index == 1:
|
|
|
|
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
|
|
|
|
self.callbacks[index](file_path, self.misc_passtrough[0])
|
|
|
|
break_loop = True
|
|
|
|
|
|
# If new file is in layer 0 of trickle-down structure and stop_processing_new_files is false, then we set stop_processing_new_files true so we do not handle any more files before stop_processing_new_files is set false again
|
|
elif f'\\{directory}\\' in file_path and index == 0 and self.process_new_files:
|
|
|
|
if ".zip" in file_path:
|
|
self.process_new_files = False
|
|
|
|
print(f"'process_new_files' is set to: {self.process_new_files}")
|
|
|
|
|
|
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
|
|
|
|
self.callbacks[index](file_path)
|
|
|
|
|
|
break_loop = True
|
|
|
|
|
|
# Otherwise we just pass the file path
|
|
elif f'\\{directory}\\' in file_path:
|
|
|
|
print(f"'process_new_files' is set to: {self.process_new_files}")
|
|
|
|
print(f"Calling callback {self.callbacks[index].__name__} for file: {file_path}")
|
|
if index == 0 and ".zip" in file_path:
|
|
print(f"Moving file: {file_path}\nTo queue at path: {str(os.path.join(queue_directory_path[0], os.path.basename(file_path)))}")
|
|
|
|
file_mover(file_path, os.path.join(queue_directory_path[0], os.path.basename(file_path)))
|
|
|
|
else:
|
|
self.callbacks[index](file_path)
|
|
|
|
break_loop = True
|
|
|
|
if break_loop == True:
|
|
break
|
|
if break_loop == True:
|
|
break
|
|
|
|
def zip_files(files_to_zip, zip_filename):
|
|
with ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
current_directory = os.getcwd() # Store the current working directory
|
|
for file_path in files_to_zip:
|
|
if os.path.exists(file_path):
|
|
base_name = os.path.basename(file_path)
|
|
os.chdir(os.path.dirname(file_path)) # Change CWD to the file's directory
|
|
zipf.write(base_name) # Write the file to the zip (without the directory tree)
|
|
os.chdir(current_directory) # Change back to the original CWD
|
|
else:
|
|
print(f"File not found: {file_path}")
|
|
|
|
|
|
def file_extractor(from_path, extract_to_path):
|
|
"""Function for handling unzipping of new files"""
|
|
|
|
# Making variables accessible in the scope of the function
|
|
global __settings__
|
|
|
|
# Add a short delay before further operations to allow the watchdog library to release its lock on the file.
|
|
time.sleep(__settings__[3])
|
|
|
|
# Convert extract_to_path to a string
|
|
extract_to_path_str = str(extract_to_path)
|
|
|
|
# Creation of directory name for extracted directory
|
|
new_file_name = extract_to_path_str + from_path[from_path.rindex("\\") : -4]
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting extraction to: {new_file_name}")
|
|
else:
|
|
pass
|
|
|
|
# Attempt extracting all files from .zip file
|
|
try:
|
|
if os.path.isdir(extract_to_path_str):
|
|
with ZipFile(from_path, "r") as zipped_object:
|
|
zipped_object.extractall(new_file_name)
|
|
if __settings__[1]: # Debugging
|
|
print(f"Successfully extracted new file to {new_file_name}")
|
|
else:
|
|
pass
|
|
else:
|
|
if __settings__[2]:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to create directory: {extract_to_path_str}")
|
|
else:
|
|
pass
|
|
os.mkdir(extract_to_path_str)
|
|
if __settings__[1]: # Debugging
|
|
print(f"Successfully created directory: {extract_to_path_str}")
|
|
else:
|
|
pass
|
|
with ZipFile(from_path, "r") as zipped_object:
|
|
zipped_object.extractall(new_file_name)
|
|
if __settings__[1]: # Debugging
|
|
print(f"Successfully extracted new file to {new_file_name}")
|
|
else:
|
|
pass
|
|
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed with error: {e}")
|
|
else:
|
|
pass
|
|
|
|
|
|
|
|
def file_mover(from_path, to_path):
|
|
"""Function for moving files from path to path"""
|
|
|
|
# Making variables accessible in scope of function
|
|
global __settings__
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to move file: {from_path}\n to: {to_path}")
|
|
else:
|
|
pass
|
|
|
|
# Add a short delay before further operations to allow the watchdog library to release its lock on the file.
|
|
time.sleep(__settings__[3])
|
|
|
|
# Attempt moving the file
|
|
try:
|
|
if os.path.exists(to_path):
|
|
if __settings__[1]: # Debugging
|
|
print(f"Duplicate found. Attempting to remove existing file or directory.")
|
|
else:
|
|
pass
|
|
|
|
try:
|
|
if os.path.isfile(to_path):
|
|
os.remove(to_path)
|
|
else:
|
|
shutil.rmtree(to_path)
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Successfully removed existing file or directory.")
|
|
else:
|
|
pass
|
|
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed removing existing file or directory with error: {e}")
|
|
else:
|
|
pass
|
|
|
|
# Create the destination directory if it doesn't exist
|
|
os.makedirs(os.path.dirname(to_path), exist_ok=True)
|
|
|
|
shutil.move(from_path, to_path)
|
|
|
|
current_time = time.time()
|
|
|
|
os.utime(to_path, (current_time, current_time))
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Successfully moved file: {from_path}\n to: {to_path}")
|
|
else:
|
|
pass
|
|
|
|
# In case of a failed attempt, handle error
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed moving file: {from_path}\n to: {to_path} \n with error: {e}")
|
|
|
|
|
|
def file_renamer(current_name, new_name):
|
|
"""Function for renaming files"""
|
|
|
|
# Making variables accessible in scope of function
|
|
global __settings__
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to rename file: {current_name}\n to: {new_name}")
|
|
else:
|
|
pass
|
|
|
|
# Add a short delay before further operations to allow the watchdog library to release its lock on the file.
|
|
time.sleep(__settings__[3])
|
|
|
|
# Attempt moving the file
|
|
try:
|
|
if os.path.exists(new_name):
|
|
if __settings__[1]: # Debugging
|
|
print(f"Duplicate found. Attempting to remove existing file or directory.")
|
|
else:
|
|
pass
|
|
try:
|
|
if os.path.isfile(new_name):
|
|
os.remove(new_name)
|
|
else:
|
|
shutil.rmtree(new_name)
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Successfully removed existing file or directory.")
|
|
else:
|
|
pass
|
|
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed to remove existing file or directory with error: {e}")
|
|
else:
|
|
pass
|
|
|
|
os.rename(current_name, new_name)
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Successfully renamed file: {current_name}\n to: {new_name}")
|
|
else:
|
|
pass
|
|
|
|
# In case of failed attempt, handle error
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed renamed file: {current_name}\n to: {new_name}\nwith error: {e}")
|
|
else:
|
|
pass
|
|
|
|
|
|
def get_files_in_directory(directory):
|
|
"""Function for returning all filenames of a specified directory"""
|
|
|
|
file_list = [] # List to store the file names
|
|
|
|
# Iterate over all files in the directory
|
|
for root, dirs, files in os.walk(directory):
|
|
for file in files:
|
|
file_path = os.path.join(root, file) # Get the full path of the file
|
|
file_list.append(file_path) # Add the file name to the list
|
|
return file_list
|
|
|
|
|
|
def archetype_determiner(directory_path, json_object):
|
|
"""Function for determining the type of data downloaded and adding the appropriate prefix to the naming scheme"""
|
|
|
|
#### Minus keywords is keywords which does not fit in a naming scheme.
|
|
#### And keywords is keywords which by themselves, when no minus keyword is present, can determine an archetype.
|
|
#### Plus keywords is keywords which is welcome in the namingscheme.
|
|
|
|
# Making variables accessible in the scope of the function
|
|
global __settings__
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to determine archetype for file: {directory_path}")
|
|
else:
|
|
pass
|
|
|
|
# Get the file names in the directory
|
|
files = get_files_in_directory(directory_path)
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = directory_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in json_object["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
scheme = []
|
|
for entry in matching_entries:
|
|
scheme.extend(entry["scheme"])
|
|
|
|
# Assuming there's only one dictionary in the "scheme" list
|
|
scheme_folder = list(scheme[0].keys())
|
|
print(scheme_folder)
|
|
|
|
|
|
##########################################
|
|
###### Finding Keywords in filename ######
|
|
##########################################
|
|
|
|
# Lists for keeping the found keywords. Initiated as empty.
|
|
found_minus = []
|
|
found_and = []
|
|
found_plus = []
|
|
|
|
# Iterate through the json file subfolders of the scheme folder
|
|
for subfolder in scheme_folder:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Searching for keywords of scheme: {subfolder}")
|
|
else:
|
|
pass
|
|
|
|
# Iterate through all files in the directory
|
|
for file in files:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Searching for keywords in file: {file}")
|
|
else:
|
|
pass
|
|
|
|
# Iterate through all keywords listed in the minus subsubfolder of the current subfolder of the scheme folder
|
|
for keyword in (scheme[0][subfolder])[0]["-"]:
|
|
|
|
# We check if the current keyword is in the filename without regarding capital case letters.
|
|
# If True, then we append it to the corresponding list initialized earlier.
|
|
if keyword.lower() in file.lower():
|
|
found_minus.append(keyword)
|
|
|
|
# Iterate through all keywords listed in the and subsubfolder of the current subfolder of the scheme folder
|
|
for keyword in (scheme[0][subfolder])[0]["&"]:
|
|
|
|
# We check if the current keyword is in the filename without regarding capital case letters.
|
|
# If True, then we append it to the corresponding list initialized earlier.
|
|
if keyword.lower() in file.lower():
|
|
found_and.append(keyword)
|
|
|
|
# Iterate through all keywords listed in the plus subsubfolder of the current subfolder of the scheme folder
|
|
for keyword in (scheme[0][subfolder])[0]["+"]:
|
|
|
|
# We check if the current keyword is in the filename without regarding capital case letters.
|
|
# If True, then we append it to the corresponding list initialized earlier.
|
|
if keyword.lower() in file.lower():
|
|
found_plus.append(keyword)
|
|
|
|
if __settings__[0]: # Debugging
|
|
print(
|
|
f"Found following keywords: \n Minus : {found_minus} \n And : {found_and} \n Plus : {found_plus}"
|
|
)
|
|
else:
|
|
pass
|
|
|
|
########################################
|
|
###### Determining file archetype ######
|
|
########################################
|
|
|
|
# Making all found keywords lowercase, while also sorting alphabetically and remove duplicates
|
|
sorted_found_minus = sorted(set([item.lower() for item in found_minus]))
|
|
sorted_found_and = sorted(set([item.lower() for item in found_and]))
|
|
sorted_found_plus = sorted(set([item.lower() for item in found_plus]))
|
|
if __settings__[0]: # Debugging
|
|
print(
|
|
f"Sorting keywords alphabetically, removing duplicates and making keywords lowercase: \n Minus : {sorted_found_minus} \n And : {sorted_found_and} \n Plus : {sorted_found_plus}"
|
|
)
|
|
else:
|
|
pass
|
|
try:
|
|
# Iterate through the json file subfolders of the scheme folder
|
|
for subfolder in scheme_folder:
|
|
if __settings__[0]: # Debugging
|
|
print(
|
|
f"Attempting to find file archetype by comparing found keywords to the naming scheme: {subfolder}"
|
|
)
|
|
else:
|
|
pass
|
|
|
|
# Making all keywords of the subfolder lowercase, while also sorting alphabetically and remove duplicates
|
|
sorted_minus = sorted(
|
|
set([item.lower() for item in (scheme[0][subfolder])[0]["-"]])
|
|
)
|
|
sorted_and = sorted(
|
|
set([item.lower() for item in (scheme[0][subfolder])[0]["&"]])
|
|
)
|
|
sorted_plus = sorted(
|
|
set([item.lower() for item in (scheme[0][subfolder])[0]["+"]])
|
|
)
|
|
|
|
# Proceed if none of the minus keywords are found
|
|
if not any(keyword.lower() in sorted_found_minus for keyword in sorted_minus):
|
|
|
|
# If any and keywords found match those of the subfolder then save the subfolder as archetype and return it to function call
|
|
if any(keyword.lower() in sorted_found_and for keyword in sorted_and):
|
|
archetype = subfolder
|
|
|
|
if __settings__[0]: # Debugging
|
|
print(f"Found archetype: {archetype}")
|
|
else:
|
|
pass
|
|
return archetype
|
|
|
|
# If all plus keywords found match those of the subfolder then save the subfolder as archetype and return it to functioncall
|
|
# We check equality without regarding order of appearance using set()
|
|
if set(sorted_plus) == set(sorted_found_plus):
|
|
archetype = subfolder
|
|
if __settings__[0]: # Debugging
|
|
print(f"Found archetype: {archetype}")
|
|
else:
|
|
pass
|
|
return archetype
|
|
|
|
except Exception as e:
|
|
if __settings__[0]: # Debugging
|
|
print(f"Failed to find file archetype with error: {e}")
|
|
else:
|
|
pass
|
|
archetype = None
|
|
return archetype
|
|
|
|
|
|
def normalize_aspect_ratio_change(original_width, original_height, resized_width, resized_height, decimals=2):
|
|
"""Function for calculating decimal change in width and height from a original image to a resized image normalised as values between 0 and 2, with 1 being no change."""
|
|
|
|
width_change_percentage = ((resized_width - original_width) / original_width) * 100
|
|
height_change_percentage = ((resized_height - original_height) / original_height) * 100
|
|
|
|
# Normalize width and height changes to be between -100 and 100
|
|
normalized_width_change = width_change_percentage / 100
|
|
normalized_height_change = height_change_percentage / 100
|
|
|
|
# Scale the normalized values to be between 0 and 2 with 1 being unchanged
|
|
normalized_width_change = min(max(normalized_width_change + 1, 0), 2)
|
|
normalized_height_change = min(max(normalized_height_change + 1, 0), 2)
|
|
|
|
closest_value_to_one = min(abs(normalized_width_change), abs(normalized_height_change))
|
|
scale_factor = 1 / closest_value_to_one if closest_value_to_one else 1
|
|
|
|
scaled_normalized_width_change = scale_factor * normalized_width_change
|
|
scaled_normalized_height_change = scale_factor * normalized_height_change
|
|
|
|
# Round the normalized values to the specified number of decimals
|
|
output_width = round(scaled_normalized_width_change, decimals)
|
|
output_height = round(scaled_normalized_height_change, decimals)
|
|
|
|
if str(output_width) == "1.0":
|
|
output_width = int(1)
|
|
|
|
if str(output_height) == "1.0":
|
|
output_height = int(1)
|
|
|
|
if original_width == original_height or output_width == output_height:
|
|
output = "EVEN"
|
|
|
|
return output, normalized_width_change, normalized_height_change
|
|
|
|
elif output_width != 1 and output_height == 1:
|
|
output = f"X{str(output_width).replace('.', '')}"
|
|
|
|
return output, normalized_width_change, normalized_height_change
|
|
|
|
elif output_height != 1 and output_width == 1:
|
|
output = f"Y{str(output_height).replace('.', '')}"
|
|
|
|
return output, normalized_width_change, normalized_height_change
|
|
|
|
else:
|
|
output = f"X{str(output_width).replace('.', '')}Y{str(output_height).replace('.', '')}"
|
|
|
|
return output, normalized_width_change, normalized_height_change
|
|
|
|
def resize_images(path, new_name_seperator, number_of_decimals):
|
|
download_source = path.split("\\", 1)[0]
|
|
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
lookup_table_greatest_axis = []
|
|
for entry in matching_entries:
|
|
lookup_table_greatest_axis.extend(entry["lookup_table_greatest_axis"])
|
|
|
|
for filename in os.listdir(path):
|
|
file_name_parts = split_filename(os.path.basename(filename), new_name_seperator)
|
|
file_type_part = split_filename(filename, '.')
|
|
|
|
if filename.endswith('.jpg') or filename.endswith('.png') or filename.endswith('.tif'):
|
|
image_path = os.path.join(path, filename)
|
|
img = cv2.imread(image_path)
|
|
original_height, original_width, _ = img.shape
|
|
|
|
if original_width > int((lookup_table_greatest_axis[0])[0]) and original_height > int((lookup_table_greatest_axis[0])[0]):
|
|
img = cv2.resize(img, (int((lookup_table_greatest_axis[0])[0]), int((lookup_table_greatest_axis[0])[0])))
|
|
|
|
original_resized_width = 2 ** (original_width - 1).bit_length()
|
|
original_resized_height = 2 ** (original_height - 1).bit_length()
|
|
|
|
try:
|
|
if __settings__[0]: # Debugging
|
|
print(f"Attempting to resize image to 'width x height': {original_resized_width} x {original_resized_height}")
|
|
else:
|
|
pass
|
|
|
|
img = cv2.resize(img, (original_resized_width, original_resized_height))
|
|
|
|
if __settings__[0]: # Debugging
|
|
print(f"Resized image to 'width x height': {original_resized_width} x {original_resized_height}")
|
|
else:
|
|
pass
|
|
|
|
except Exception as e:
|
|
if __settings__[0]: # Debugging
|
|
print(f"Failed resizing image to 'width x height': {original_resized_width} x {original_resized_height}\nwith error: {e}")
|
|
else:
|
|
pass
|
|
|
|
for element in lookup_table_greatest_axis:
|
|
if original_resized_width == element[0] or original_resized_height == element[0]:
|
|
resolution = element[1]
|
|
break
|
|
|
|
scaling_value, width_change, height_change = normalize_aspect_ratio_change(original_width, original_height, original_resized_width, original_resized_height, number_of_decimals)
|
|
|
|
try:
|
|
if __settings__[0]: # Debugging
|
|
print(f"Attempting to save resized image with 'width x height': {original_resized_width} x {original_resized_height}\nAnd aspect ratio: {width_change}x{height_change}\nFormatted as: {scaling_value}")
|
|
else:
|
|
pass
|
|
|
|
temp_path = os.path.join(path, "temp.jpg") # Temporary file path
|
|
cv2.imwrite(temp_path, img, [cv2.IMWRITE_JPEG_QUALITY, 98]) # Save with temporary high quality
|
|
|
|
img_temp = cv2.imread(temp_path)
|
|
cv2.imwrite(
|
|
os.path.join(path, f"{file_name_parts[0]}{new_name_seperator}{file_name_parts[1]}{new_name_seperator}{resolution}{new_name_seperator}{scaling_value}{new_name_seperator}{file_name_parts[2]}.{file_type_part[-1]}"), img_temp
|
|
) # Save with desired quality
|
|
|
|
os.unlink(temp_path) # Remove temporary file
|
|
|
|
if __settings__[0]: # Debugging
|
|
print(f"Saved resized image with 'width x height': {original_resized_width} x {original_resized_height}\nAnd aspect ratio: {width_change}x{height_change}\nFormatted as: {scaling_value}")
|
|
else:
|
|
pass
|
|
|
|
except Exception as e:
|
|
if __settings__[0]: # Debugging
|
|
print(f"Failed to save resized image with 'width x height': {original_resized_width} x {original_resized_height}\nAnd aspect ratio: {width_change}x{height_change}\nFormatted as: {scaling_value}\nWith error: {e}")
|
|
else:
|
|
pass
|
|
|
|
downscale_resized_width = original_resized_width
|
|
downscale_resized_height = original_resized_height
|
|
|
|
upscale_resized_width = original_resized_width
|
|
upscale_resized_height = original_resized_height
|
|
|
|
if original_resized_width >= (lookup_table_greatest_axis[-1])[0] or original_resized_height >= (lookup_table_greatest_axis[-1])[0]:
|
|
for element in reversed(lookup_table_greatest_axis):
|
|
if original_resized_width == element[0] or original_resized_height == element[0]:
|
|
resolution = element[1]
|
|
break
|
|
|
|
while resolution is not (lookup_table_greatest_axis[-1])[1]:
|
|
img_copy = img.copy()
|
|
|
|
downscale_resized_width /= 2
|
|
downscale_resized_height /= 2
|
|
downscaled_img = cv2.resize(
|
|
img_copy, (int(downscale_resized_width), int(downscale_resized_height)))
|
|
|
|
for element in lookup_table_greatest_axis:
|
|
if downscale_resized_width == element[0] or downscale_resized_height == element[0]:
|
|
resolution = element[1]
|
|
break
|
|
|
|
temp_path = os.path.join(path, "temp.jpg") # Temporary file path
|
|
cv2.imwrite(temp_path, downscaled_img, [cv2.IMWRITE_JPEG_QUALITY, 98]) # Save with temporary high quality
|
|
|
|
downscaled_img_temp = cv2.imread(temp_path)
|
|
cv2.imwrite(
|
|
os.path.join(path, f"{file_name_parts[0]}{new_name_seperator}{file_name_parts[1]}{new_name_seperator}{resolution}{new_name_seperator}{scaling_value}{new_name_seperator}{file_name_parts[2]}.{file_type_part[-1]}"), downscaled_img_temp
|
|
) # Save with desired quality
|
|
|
|
os.unlink(temp_path) # Remove temporary file
|
|
|
|
if __settings__[0]: # Debugging
|
|
print(f"Saved downscaled image with resolution '{resolution}' and dimensions 'width x height': {downscale_resized_width} x {downscale_resized_height}")
|
|
else:
|
|
pass
|
|
|
|
else:
|
|
for element in reversed(lookup_table_greatest_axis):
|
|
if original_resized_width == element[0] or original_resized_height == element[0]:
|
|
resolution = element[1]
|
|
break
|
|
|
|
while resolution is not (lookup_table_greatest_axis[-1])[1]:
|
|
img_copy = img.copy()
|
|
|
|
downscale_resized_width /= 2
|
|
downscale_resized_height /= 2
|
|
downscaled_img = cv2.resize(
|
|
img_copy, (int(downscale_resized_width), int(downscale_resized_height)))
|
|
|
|
for element in lookup_table_greatest_axis:
|
|
if downscale_resized_width == element[0] or downscale_resized_height == element[0]:
|
|
resolution = element[1]
|
|
break
|
|
|
|
temp_path = os.path.join(path, "temp.jpg") # Temporary file path
|
|
cv2.imwrite(temp_path, downscaled_img, [cv2.IMWRITE_JPEG_QUALITY, 98]) # Save with temporary high quality
|
|
|
|
downscaled_img_temp = cv2.imread(temp_path)
|
|
cv2.imwrite(
|
|
os.path.join(path, f"{file_name_parts[0]}{new_name_seperator}{file_name_parts[1]}{new_name_seperator}{resolution}{new_name_seperator}{scaling_value}{new_name_seperator}{file_name_parts[2]}.{file_type_part[-1]}"), downscaled_img_temp
|
|
) # Save with desired quality
|
|
|
|
os.unlink(temp_path) # Remove temporary file
|
|
|
|
if __settings__[0]: # Debugging
|
|
print(f"Saved downscaled image with resolution '{resolution}' and dimensions 'width x height': {downscale_resized_width} x {downscale_resized_height}")
|
|
else:
|
|
pass
|
|
|
|
time.sleep(1)
|
|
|
|
os.unlink(image_path)
|
|
|
|
|
|
def split_filename(filename, separator):
|
|
"""Function used to get a list of all the parts that make up the file name"""
|
|
|
|
parts = filename.split(separator)
|
|
|
|
return parts
|
|
|
|
|
|
def move_to_trickle_down_layer(file_path, from_layer, to_layer, directory_list_to_search_in):
|
|
"""Function used when moving files to the next layer of the trickle-down structure"""
|
|
|
|
# Making variables accessible in scope of function
|
|
global __settings__
|
|
|
|
try:
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to move file from trickle-down layer {str(from_layer)} to trickle-down layer {str(to_layer)}: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# Use split() method to extract the parent directory of the path
|
|
parent = file_path.split("\\", 1)[0]
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Detected that file is within parent directory: {parent}")
|
|
else:
|
|
pass
|
|
|
|
|
|
layer_string = f'{parent}\\{str(to_layer)}_'
|
|
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Searching for string '{layer_string}' in following directories: {directory_list_to_search_in}")
|
|
else:
|
|
pass
|
|
|
|
# Variable for controlling breaking out of loop early
|
|
break_loop = False
|
|
|
|
for folder, folders_list in zip(download_sources, directory_list_to_search_in):
|
|
for directory in folders_list:
|
|
full_path = os.path.join(folder["name"], directory)
|
|
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Checking if {layer_string} is in: {full_path}")
|
|
else:
|
|
pass
|
|
|
|
if layer_string in full_path:
|
|
target_path = f"{full_path}\\{os.path.basename(file_path)}"
|
|
|
|
file_mover(file_path, target_path)
|
|
break_loop = True
|
|
|
|
if break_loop == True:
|
|
break
|
|
if break_loop == True:
|
|
break
|
|
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed to move file from trickle-down layer {str(from_layer)} to trickle-down layer {str(to_layer)}: {file_path}\nWith error: {e}")
|
|
else:
|
|
pass
|
|
|
|
|
|
def extract_to_trickle_down_layer(file_path, from_layer, to_layer, directory_list_to_search_in):
|
|
"""Function used when extracting files to the next layer of the trickle-down structure"""
|
|
|
|
# Making variables accessible in scope of function
|
|
global __settings__
|
|
|
|
if '.zip' in file_path:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to extract file from trickle-down layer {str(from_layer)} to trickle-down layer {str(to_layer)}: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# Use split() method to extract the parent directory of the path
|
|
parent = file_path.split("\\", 1)[0]
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Detected that file is within parent directory: {parent}")
|
|
else:
|
|
pass
|
|
|
|
break_loop = False
|
|
|
|
for folder, folders_list in zip(download_sources, directory_list_to_search_in):
|
|
for directory in folders_list:
|
|
full_path = os.path.join(folder["name"], directory)
|
|
|
|
layer_string = f'{parent}\\{str(to_layer)}_'
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Checking if {layer_string} is in: {full_path}")
|
|
else:
|
|
pass
|
|
|
|
if layer_string in full_path:
|
|
target_directory = full_path
|
|
|
|
file_extractor(file_path, target_directory)
|
|
break_loop = True
|
|
|
|
if break_loop == True:
|
|
break
|
|
if break_loop == True:
|
|
break
|
|
else:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Error extracting file from trickle-down layer {str(from_layer)} to trickle-down layer {str(to_layer)}. File is not a .zip file: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
|
|
def file_added_callback(file_path):
|
|
"""
|
|
Function called by the watcher class whenever a new file is added to a watched directory.
|
|
This function moves all .zip files to the first layer of the trickle-down structure.
|
|
"""
|
|
|
|
# Making variables accessible in scope of function
|
|
global __settings__
|
|
global process_directory_list
|
|
global download_sources
|
|
|
|
file_path = str(file_path) # Convert file_path to a string
|
|
|
|
if __settings__[1]: # Debugging
|
|
print("New file detected in trickle-down layer 0:", file_path)
|
|
else:
|
|
pass
|
|
|
|
# If the downloaded file has '.zip' in the name, then we move it to the first layer of the trickle-down structure. Else we pass.
|
|
if ".zip" in file_path:
|
|
|
|
move_to_trickle_down_layer(file_path, from_layer=0, to_layer=1, directory_list_to_search_in=process_directory_list)
|
|
|
|
else:
|
|
pass
|
|
|
|
|
|
def callback_trickleDownLayer_1(file_path, archive_directories):
|
|
"""Function called when a new file is detected in the first layer of the trickle-down processing structure."""
|
|
|
|
# Making variables accessible in scope of function
|
|
global __settings__
|
|
global process_directory_list
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"File arrived in trickle-down layer 1: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = file_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
zip_archive_directory_path = []
|
|
for entry in matching_entries:
|
|
zip_archive_directory_path.extend(entry["zip_archive_directory_path"])
|
|
|
|
break_loop = False
|
|
|
|
# Iterate trough the lists of directories we have specified to watch
|
|
for list in process_directory_list:
|
|
|
|
# Iterate trough the directories in the list
|
|
for directory in list:
|
|
|
|
# We check which watched directory the file is in
|
|
if directory in file_path:
|
|
|
|
# We unzip the file to second layer of the trickle-down struckture
|
|
extract_to_trickle_down_layer(file_path, from_layer=1, to_layer=2, directory_list_to_search_in=process_directory_list)
|
|
|
|
# We move the sourcefile to the archive directory
|
|
file_mover(file_path , os.path.join(zip_archive_directory_path[0], os.path.basename(file_path)))
|
|
|
|
break_loop = True
|
|
|
|
if break_loop:
|
|
break
|
|
if break_loop:
|
|
break
|
|
|
|
|
|
def callback_trickleDownLayer_2(file_path):
|
|
"""Function called when a new file is detected in the second layer of the trickle-down processing structure"""
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"File arrived in trickle-down layer 2: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
files_in_directory = get_files_in_directory(file_path)
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Checking if more than one set of files are present in file: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = file_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
filetype_indicating_a_set_of_files_list = []
|
|
for entry in matching_entries:
|
|
filetype_indicating_a_set_of_files_list.extend(entry["filetype_indicating_a_set_of_files"])
|
|
|
|
# Variable holding any files with filetypes that are both listed in the json file and found in the directory
|
|
files_found = []
|
|
for file in files_in_directory:
|
|
for filetype in filetype_indicating_a_set_of_files_list:
|
|
if filetype in file:
|
|
files_found.append(file)
|
|
|
|
if len(files_found) > 1:
|
|
if __settings__[1]: # Debugging
|
|
print(f"There are {len(files_found)} sets of files in file: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
basenames = []
|
|
for file in files_found:
|
|
basename = (os.path.basename(file))[:-int(len(filetype_indicating_a_set_of_files_list[0]))]
|
|
basenames.append(basename)
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Found sets {basenames} in file: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
new_directories = []
|
|
sets = []
|
|
for name in basenames:
|
|
|
|
current_file_directory_path = os.path.dirname(file_path)
|
|
new_file_directory_path = os.path.join(file_path, name)
|
|
new_directories.append(new_file_directory_path)
|
|
|
|
files_in_set = []
|
|
for file in files_in_directory:
|
|
if name in os.path.basename(file):
|
|
files_in_set.append(file)
|
|
|
|
sets.append(files_in_set)
|
|
|
|
for directory in new_directories:
|
|
os.mkdir(directory)
|
|
|
|
for i in range(len(sets)):
|
|
for file in sets[i]:
|
|
shutil.move(file, os.path.join(os.path.join(os.path.dirname(file), basenames[i]), os.path.basename(file)))
|
|
|
|
for directory in new_directories:
|
|
|
|
# We move the file to second layer of the trickle-down struckture
|
|
move_to_trickle_down_layer(directory, from_layer=2, to_layer=3, directory_list_to_search_in=process_directory_list)
|
|
|
|
shutil.rmtree(file_path)
|
|
|
|
else:
|
|
move_to_trickle_down_layer(file_path, from_layer=2, to_layer=3, directory_list_to_search_in=process_directory_list)
|
|
|
|
|
|
def callback_trickleDownLayer_3(file_path, file_to_remove_list_of_lists):
|
|
"""Function called when a new file is detected in the third layer of the trickle-down processing structure"""
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"File arrived in trickle-down layer 3: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
files_in_directory = get_files_in_directory(file_path)
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = file_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
files_to_remove_from_directories_list = []
|
|
for entry in matching_entries:
|
|
files_to_remove_from_directories_list.extend(entry["files_to_remove_from_directories"])
|
|
|
|
# Extract the "minimum_image_size" from all matching entries
|
|
minimum_image_size = []
|
|
for entry in matching_entries:
|
|
minimum_image_size.extend(entry["minimum_image_size"])
|
|
|
|
try:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to remove files: {file_to_remove_list_of_lists}\n from file: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# Iterate through the files in the directory
|
|
for file in files_in_directory:
|
|
for file_to_remove in files_to_remove_from_directories_list:
|
|
if file_to_remove.lower() in file.lower():
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Removing file: {file}")
|
|
else:
|
|
pass
|
|
|
|
# Remove the file
|
|
os.remove(file)
|
|
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed removing files from {file_path} with error {e}")
|
|
else:
|
|
pass
|
|
|
|
files_in_directory = get_files_in_directory(file_path)
|
|
|
|
try:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to remove files with dimensions less than: {minimum_image_size[0]}x{minimum_image_size[1]}\nFrom file: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# Iterate through the files in the directory
|
|
for file in files_in_directory:
|
|
img = cv2.imread(file, 1)
|
|
|
|
if img is not None:
|
|
height, width, channels = img.shape
|
|
|
|
if height < minimum_image_size[0] and width < minimum_image_size[1]:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Removing file: {file}")
|
|
else:
|
|
pass
|
|
|
|
# Remove the file
|
|
os.remove(file)
|
|
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed removing files from {file_path} with error {e}")
|
|
else:
|
|
pass
|
|
|
|
else:
|
|
if __settings__[1]: # Debugging
|
|
print(f"ERROR: No matching download_source directory found in JSON data for '{file_path}'.")
|
|
else:
|
|
pass
|
|
|
|
move_to_trickle_down_layer(file_path, from_layer=3, to_layer=4, directory_list_to_search_in=process_directory_list)
|
|
|
|
|
|
def callback_trickleDownLayer_4(file_path):
|
|
"""Function called when a new file is detected in the fourth layer of the trickle-down processing structure"""
|
|
|
|
global watcher
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"File arrived in trickle-down layer 4: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# We determine what prefix the downloaded file shall get
|
|
file_archetype = archetype_determiner(file_path, extracted_json_settings)
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = file_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
new_name_seperator = []
|
|
for entry in matching_entries:
|
|
new_name_seperator.extend(entry["new_name_seperator"])
|
|
|
|
if matching_entries:
|
|
# Extract the "filetype_indicating_a_set_of_files" from all matching entries
|
|
filetype_indicating_a_set_of_files = []
|
|
for entry in matching_entries:
|
|
filetype_indicating_a_set_of_files.extend(entry["filetype_indicating_a_set_of_files"])
|
|
|
|
if matching_entries:
|
|
# Extract the "model_output_path" from all matching entries
|
|
model_output_path = []
|
|
for entry in matching_entries:
|
|
model_output_path.extend(entry["model_output_path"])
|
|
|
|
new_dir_name = os.path.join(os.path.dirname(file_path), f"{file_archetype}{new_name_seperator[0]}{os.path.basename(file_path)}")
|
|
|
|
time.sleep(1)
|
|
|
|
try:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting renaiming file: {file_path}")
|
|
print(f" to: {new_dir_name}")
|
|
else: pass
|
|
file_renamer(file_path, new_dir_name)
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed renaming file: {file_path}")
|
|
print(f" to: {new_dir_name}")
|
|
print(f" with error: {e}")
|
|
else: pass
|
|
|
|
files = get_files_in_directory(new_dir_name)
|
|
|
|
for file in files:
|
|
for file_type in filetype_indicating_a_set_of_files:
|
|
if file_type in file:
|
|
new_fbx_file_name = f"{file_archetype}{new_name_seperator[0]}{os.path.basename(file)}"
|
|
file_renamer(file, os.path.join(new_dir_name, new_fbx_file_name))
|
|
file_mover(os.path.join(new_dir_name, new_fbx_file_name), os.path.join(model_output_path[0], new_fbx_file_name))
|
|
|
|
|
|
if "None" in os.path.basename(new_dir_name):
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed finding file archetype. Archetype set to 'None'. Abandoning files.")
|
|
else:
|
|
pass
|
|
|
|
|
|
watcher.event_handler.process_new_files = True
|
|
if __settings__[1]: # Debugging
|
|
print(f"'process_new_files' is set to: {watcher.event_handler.process_new_files}")
|
|
else:
|
|
pass
|
|
|
|
|
|
else:
|
|
move_to_trickle_down_layer(new_dir_name, from_layer=4, to_layer=5, directory_list_to_search_in=process_directory_list)
|
|
|
|
|
|
def callback_trickleDownLayer_5(file_path):
|
|
"""Function called when a new file is detected in the sixth layer of the trickle-down processing structure"""
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"File arrived in trickle-down layer 5: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = file_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "scheme" from all matching entries
|
|
scheme = []
|
|
for entry in matching_entries:
|
|
scheme.extend(entry["scheme"])
|
|
|
|
if matching_entries:
|
|
# Extract the "new_name_seperator" from all matching entries
|
|
new_name_seperator = []
|
|
for entry in matching_entries:
|
|
new_name_seperator.extend(entry["new_name_seperator"])
|
|
|
|
if matching_entries:
|
|
# Extract the "original_name_seperator" from all matching entries
|
|
original_name_seperator = []
|
|
for entry in matching_entries:
|
|
original_name_seperator.extend(entry["original_name_seperator"])
|
|
|
|
if matching_entries:
|
|
# Extract the "original_name_comes_before_seperator_number" from all matching entries
|
|
original_name_index = []
|
|
for entry in matching_entries:
|
|
original_name_index.extend(entry["original_name_comes_before_seperator_number"])
|
|
|
|
if matching_entries:
|
|
# Extract the "maptype_comes_before_seperator_number" from all matching entries
|
|
maptype_index = []
|
|
for entry in matching_entries:
|
|
maptype_index.extend(entry["maptype_comes_before_seperator_number"])
|
|
|
|
if matching_entries:
|
|
# Extract the "maptype_comes_before_seperator_number" from all matching entries
|
|
maptype_keywords_ending_with_standardized_maptype_list_of_lists = []
|
|
for entry in matching_entries:
|
|
maptype_keywords_ending_with_standardized_maptype_list_of_lists.extend(entry["maptype_keywords_ending_with_standardized_maptype"])
|
|
|
|
# Assuming there's only one dictionary in the "scheme" list
|
|
scheme_folder = list(scheme[0].keys())
|
|
|
|
files = get_files_in_directory(file_path)
|
|
|
|
file_archetype = "None"
|
|
|
|
# Get archetype from directory name and apply it to the name of all files within the directory
|
|
for archetype in scheme_folder:
|
|
if f"{archetype}{new_name_seperator[0]}" in file_path:
|
|
file_archetype = archetype
|
|
|
|
# Get all parts of file name and rename the file with the correct parts in the correct order
|
|
for file in files:
|
|
file_name_parts = split_filename(os.path.basename(file), original_name_seperator[0])
|
|
file_type_part = split_filename(file, '.')
|
|
|
|
if "col_var" in file.lower():
|
|
match = re.search(r'col_var(\d+)', file.lower())
|
|
|
|
if match:
|
|
variant_value = int(match.group(1))
|
|
|
|
new_file_name = os.path.join(os.path.dirname(file), f"{file_archetype}{new_name_seperator[0]}{file_name_parts[original_name_index[0]]}{new_name_seperator[0]}COL-{variant_value}{new_name_seperator[0]}.{file_type_part[-1]}")
|
|
|
|
else:
|
|
pass
|
|
|
|
|
|
elif "COL" in file or "col" in file:
|
|
new_file_name = os.path.join(os.path.dirname(file), f"{file_archetype}{new_name_seperator[0]}{file_name_parts[original_name_index[0]]}{new_name_seperator[0]}COL-1{new_name_seperator[0]}.{file_type_part[-1]}")
|
|
|
|
else:
|
|
maptype = file_name_parts[maptype_index[0]]
|
|
|
|
for a_list in maptype_keywords_ending_with_standardized_maptype_list_of_lists:
|
|
if maptype.lower() in list(map(str.lower, a_list)):
|
|
standardised_maptype = a_list[-1]
|
|
break
|
|
|
|
else:
|
|
standardised_maptype = f"{maptype}-UNRECOGNIZED"
|
|
|
|
new_file_name = os.path.join(os.path.dirname(file), f"{file_archetype}{new_name_seperator[0]}{file_name_parts[original_name_index[0]]}{new_name_seperator[0]}{standardised_maptype}{new_name_seperator[0]}.{file_type_part[-1]}")
|
|
|
|
file_renamer(file, new_file_name)
|
|
|
|
move_to_trickle_down_layer(file_path, from_layer=5, to_layer=6, directory_list_to_search_in=process_directory_list)
|
|
|
|
|
|
|
|
def callback_trickleDownLayer_6(file_path):
|
|
"""Function called when a new file is detected in the fifth layer of the trickle-down processing structure"""
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"File arrived in trickle-down layer 6: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = file_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
new_name_seperator = []
|
|
for entry in matching_entries:
|
|
new_name_seperator.extend(entry["new_name_seperator"])
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
aspect_ratio_number_of_decimals = []
|
|
for entry in matching_entries:
|
|
aspect_ratio_number_of_decimals.extend(entry["aspect_ratio_number_of_decimals"])
|
|
|
|
resize_images(file_path, new_name_seperator[0], int(aspect_ratio_number_of_decimals[0]))
|
|
|
|
move_to_trickle_down_layer(file_path, from_layer=6, to_layer=7, directory_list_to_search_in=process_directory_list)
|
|
|
|
|
|
|
|
def callback_trickleDownLayer_7(file_path):
|
|
"""Function called when a new file is detected in the seventh layer of the trickle-down processing structure"""
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"File arrived in trickle-down layer 7: {file_path}")
|
|
else:
|
|
pass
|
|
|
|
# Use split() method to extract the download_source directory of the path
|
|
download_source = file_path.split("\\", 1)[0]
|
|
|
|
# Find the matching entry for the current download_source directory in the JSON data
|
|
matching_entries = [entry for entry in extracted_json_settings["download_sources"] if entry["name"] in download_source]
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
output_paths = []
|
|
for entry in matching_entries:
|
|
output_paths.extend(entry["output_paths"])
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
scheme = []
|
|
for entry in matching_entries:
|
|
scheme.extend(entry["scheme"])
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
new_name_seperator = []
|
|
for entry in matching_entries:
|
|
new_name_seperator.extend(entry["new_name_seperator"])
|
|
|
|
if matching_entries:
|
|
# Extract the "files_to_remove_from_directories" from all matching entries
|
|
unrecognized_maptype = []
|
|
for entry in matching_entries:
|
|
unrecognized_maptype.extend(entry["unrecognized_maptype_output_path"])
|
|
|
|
scheme_folder = list(scheme[0].keys())
|
|
|
|
files = get_files_in_directory(file_path)
|
|
|
|
for file in files:
|
|
if "UNRECOGNIZED" in file:
|
|
file_mover(file, os.path.join(unrecognized_maptype[0], os.path.basename(file)))
|
|
|
|
files = get_files_in_directory(file_path)
|
|
|
|
for file in files:
|
|
for index, archetype in enumerate(scheme_folder):
|
|
if f"{archetype}{new_name_seperator[0]}" in file:
|
|
file_mover(file, os.path.join(output_paths[index], os.path.basename(file)))
|
|
if not os.listdir(file_path):
|
|
os.rmdir(file_path)
|
|
|
|
|
|
def load_settings_json():
|
|
"""Function for loading the settings.json file"""
|
|
|
|
# Making variables accessible in scope of function
|
|
global extracted_json_settings
|
|
global __settings__
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to load {__settings__[0]}.")
|
|
else:
|
|
pass
|
|
|
|
# We attempt to load settings.json. If the file is using wrong syntax, then this try except statement will notify the user without terminating the program
|
|
try:
|
|
# Open settings.json file in read mode and call it 'json_file'
|
|
with open(__settings__[0], "r") as json_file:
|
|
|
|
# Read all lines disregarding any whitespace (spaces and newlines)
|
|
lines = [line.rstrip() for line in json_file]
|
|
|
|
# Join the lines into a single string
|
|
json_string = "".join(lines)
|
|
|
|
# Parse the JSON string into a Python object
|
|
extracted_json_settings = json.loads(json_string)
|
|
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Failed to load {__settings__[0]} with error: {e}")
|
|
print("Retrying in 30 seconds")
|
|
time.sleep(30)
|
|
load_settings_json()
|
|
|
|
|
|
def create_directories_from_json_file(directory_list):
|
|
"""Function for handling creation of directories listed in .json file"""
|
|
|
|
# Making variables accessible in scope of function
|
|
global __settings__
|
|
|
|
# If the system shall create missing directories, then we check if any of the directories from settings.json does not exist, and if so we create it
|
|
if __settings__[2]:
|
|
for folder, folders_list in zip(download_sources, directory_list):
|
|
for directory in folders_list:
|
|
if ".\\" in directory:
|
|
full_path = directory
|
|
else:
|
|
full_path = os.path.join(folder["name"], directory)
|
|
if not os.path.isdir(full_path):
|
|
try:
|
|
if __settings__[1]: # Debugging
|
|
print(f"Attempting to create directory: {full_path}")
|
|
else:
|
|
pass
|
|
os.makedirs(full_path)
|
|
if __settings__[1]: # Debugging
|
|
print(f"Successfully created directory: {full_path}")
|
|
else:
|
|
pass
|
|
except Exception as e:
|
|
if __settings__[1]: # Debugging
|
|
print(
|
|
f"Failed to create directory {full_path} with error: {e}"
|
|
)
|
|
else:
|
|
pass
|
|
|
|
|
|
def init():
|
|
"""Function for initializing the watcher. This is also the callback function for when settings.json is updated, so that the watcher is always up to date on its settings."""
|
|
|
|
global watcher
|
|
|
|
# Making variables accessible in the scope of the function
|
|
global directory_list
|
|
global extracted_json_settings
|
|
global process_directory_list
|
|
global download_sources
|
|
|
|
# Load the settings.json file
|
|
load_settings_json()
|
|
|
|
# Extract all download folders from the .json file
|
|
download_sources = extracted_json_settings["download_sources"]
|
|
|
|
# Extract the "process_directories_names" for each item in "download_sources"
|
|
process_directory_list = [
|
|
folder["process_directories_names"]
|
|
for folder in extracted_json_settings["download_sources"]
|
|
]
|
|
|
|
# Extract the "zip_archive_directory_path" for each item in "download_sources"
|
|
archive_directory_list = [
|
|
folder["zip_archive_directory_path"]
|
|
for folder in extracted_json_settings["download_sources"]
|
|
]
|
|
|
|
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
|
|
files_to_remove_from_directories_list = [
|
|
folder["files_to_remove_from_directories"]
|
|
for folder in extracted_json_settings["download_sources"]
|
|
]
|
|
|
|
|
|
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
|
|
unrecognized_maptype_output_path = [
|
|
folder["unrecognized_maptype_output_path"]
|
|
for folder in extracted_json_settings["download_sources"]
|
|
]
|
|
|
|
|
|
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
|
|
fbx_output_path = [
|
|
folder["model_output_path"]
|
|
for folder in extracted_json_settings["download_sources"]
|
|
]
|
|
|
|
|
|
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
|
|
output_paths = [
|
|
folder["output_paths"]
|
|
for folder in extracted_json_settings["download_sources"]
|
|
]
|
|
|
|
# Extract the "files_to_remove_from_directories" for each item in "download_sources"
|
|
queue_directory_path = [
|
|
folder["queue_directory_path"]
|
|
for folder in extracted_json_settings["download_sources"]
|
|
]
|
|
|
|
# Create directories listed in "process_directories_names" tags in the json file
|
|
create_directories_from_json_file(process_directory_list)
|
|
|
|
# Create directories listed in "zip_archive_directory_path" tags in the json file
|
|
create_directories_from_json_file(archive_directory_list)
|
|
|
|
# Create directories listed in "unrecognized_maptype_output_path" tags in the json file
|
|
create_directories_from_json_file(unrecognized_maptype_output_path)
|
|
|
|
# Create directories listed in "fbx_output_path" tags in the json file
|
|
create_directories_from_json_file(fbx_output_path)
|
|
|
|
# Create directories listed in "output_paths" tags in the json file
|
|
create_directories_from_json_file(output_paths)
|
|
|
|
# Create directories listed in "queue_directory_path" tags in the json file
|
|
create_directories_from_json_file(queue_directory_path)
|
|
|
|
process_directory_list_callback_functions = [
|
|
file_added_callback,
|
|
callback_trickleDownLayer_1,
|
|
callback_trickleDownLayer_2,
|
|
callback_trickleDownLayer_3,
|
|
callback_trickleDownLayer_4,
|
|
callback_trickleDownLayer_5,
|
|
callback_trickleDownLayer_6,
|
|
callback_trickleDownLayer_7
|
|
]
|
|
|
|
misc_passtrough = [archive_directory_list, files_to_remove_from_directories_list]
|
|
|
|
# Create an instance of DirectoryWatcher
|
|
watcher = DirectoryWatcher(
|
|
process_directory_list,
|
|
process_directory_list_callback_functions,
|
|
__settings__[0],
|
|
init,
|
|
misc_passtrough
|
|
)
|
|
|
|
if __settings__[1]: # Debugging
|
|
print(f"Watching for new files in directories: {process_directory_list}")
|
|
else:
|
|
pass
|
|
|
|
# Start watching the directories
|
|
watcher.watch()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
####################################
|
|
########## SETTINGS ##########
|
|
####################################
|
|
|
|
## Do touch ##
|
|
json_file_name = (
|
|
"settings.json" # json file name contaning settings of the whatched directories
|
|
)
|
|
debug_prints = True # Bool for activating debug print statements
|
|
create_missing_folders = True # Bool for determining whether or not the system shall create missing folders.
|
|
delay = 0.5 # Seconds
|
|
|
|
|
|
## Do not touch ##
|
|
__settings__ = [json_file_name, debug_prints, create_missing_folders, delay]
|
|
|
|
####################################
|
|
########## Main ##########
|
|
####################################
|
|
|
|
# Init variables
|
|
download_sources = []
|
|
process_directory_list = []
|
|
archive_directory_list = []
|
|
extracted_json_settings = []
|
|
watcher = None
|
|
|
|
# Run initialization
|
|
init() |