from ..types import File from ..utils.scan_for_music import scan_for_music from ..utils.load_tag_information import load_tag_information from ..transcode_levels import transcode_levels from ..utils.transcoder import transcode, get_transcode_config from os import makedirs as make_directories from os.path import exists as path_exists from shutil import copy as copy_file from copy import deepcopy as deep_copy class CopyCommandState: files: list[File] = [] transcoded_files: list[File] = [] class CopyCommand(): def __init__(self, src: str, dest: str, transcode_level: str, single_directory: bool, skip_existing: bool ): self.src = src self.dest = dest self.transcode_level = transcode_level self.single_directory = single_directory self.skip_existing = skip_existing self.state = CopyCommandState() def run(self): print("Copying") self.scan_for_music() self.load_tag_information() if self.single_directory: self.check_for_collisions() self.transcode_files() if self.single_directory: self.create_mappings() def scan_for_music(self): print("Scanning For Music") self.state.files = scan_for_music(self.src) def load_tag_information(self): print("Loading Tag Information") for file in self.state.files: file.tags = load_tag_information(file) def check_for_collisions(self): print("Checking For Colisions") seen = set() dupes = [] for file in self.state.files: filename = file.filename if filename in seen: dupes.append(filename) else: seen.add(filename) if len(dupes) > 0: print("Dupes Found:", dupes) print("Cannot continue using --single-directory") print("Please rename or remove duplicates") exit() def _transcode_copy(self, file: File): src = file.join_path_to() dest = file.join_filename( ) if self.single_directory else file.join_path_from_src() dest = self.dest + "/" + dest exists = path_exists(dest) if (self.skip_existing and not exists) or not self.skip_existing: print("Copying", src, "to", dest) copy_file( src, dest, ) else: print( "Skipping", src, "as already is copied at", dest) self.state.transcoded_files.append(file) def _transcode_with_level(self, file: File, level: str): trans_config = get_transcode_config(file, level) src = file.join_path_to() new_file = deep_copy(file) new_file.extension = trans_config.file_extension dest_filepath = new_file.join_filename( ) if self.single_directory else new_file.join_path_from_src() dest_filepath = self.dest + "/" + dest_filepath if (self.skip_existing and path_exists(dest_filepath)): print("Skipping transcoding", dest_filepath) self.state.transcoded_files.append(new_file) return print("Transcoding", src, "to", dest_filepath) transcode(file, trans_config, level, dest_filepath) self.state.transcoded_files.append(new_file) def transcode_files(self): print("Transcoding Files") if not self.single_directory: directories = set() for file in self.state.files: directories.add(file.path_from_src) for dir in directories: make_directories( self.dest + "/" + dir, exist_ok=True) if self.transcode_level == "copy": for file in self.state.files: self._transcode_copy(file) return elif self.transcode_level in transcode_levels: for file in self.state.files: self._transcode_with_level( file, self.transcode_level) def create_mappings(self): with open(self.dest + "/" + "mappings.txt", "w") as f: f.write("\n".join([ f"{file.path_from_src} <- {file.filename}" for file in self.state.files ]))