145 lines
4.5 KiB
Python
145 lines
4.5 KiB
Python
from ..types import File
|
|
from ..utils.scan_for_music import scan_for_music
|
|
from ..utils.load_tag_information import load_tag_information
|
|
from ..transcode_levels import print_transcode_levels, transcode_levels_list
|
|
from ..utils.transcoder import transcode, get_transcode_config
|
|
|
|
from os import makedirs as make_directories
|
|
from os.path import exists as path_exists
|
|
from shutil import copy as copy_file
|
|
from copy import deepcopy as deep_copy
|
|
|
|
|
|
class CopyCommandState:
|
|
files: list[File] = []
|
|
transcoded_files: list[File] = []
|
|
|
|
class CopyCommand():
|
|
def __init__(self,
|
|
src: str,
|
|
dest: str,
|
|
transcode_level: str,
|
|
single_directory: bool,
|
|
skip_existing: bool
|
|
):
|
|
self.src = src
|
|
self.dest = dest
|
|
self.transcode_level = transcode_level
|
|
self.single_directory = single_directory
|
|
self.skip_existing = skip_existing
|
|
self.state = CopyCommandState()
|
|
|
|
def run(self):
|
|
if self.transcode_level == "list":
|
|
print_transcode_levels()
|
|
exit()
|
|
|
|
print("Copying")
|
|
self.scan_for_music()
|
|
self.load_tag_information()
|
|
if self.single_directory:
|
|
self.check_for_collisions()
|
|
self.transcode_files()
|
|
if self.single_directory:
|
|
self.create_mappings()
|
|
|
|
def scan_for_music(self):
|
|
print("Scanning For Music")
|
|
self.state.files = scan_for_music(self.src)
|
|
|
|
def load_tag_information(self):
|
|
print("Loading Tag Information")
|
|
|
|
for file in self.state.files:
|
|
file.tags = load_tag_information(file)
|
|
|
|
def check_for_collisions(self):
|
|
print("Checking For Colisions")
|
|
seen = set()
|
|
dupes = []
|
|
|
|
for file in self.state.files:
|
|
filename = file.filename
|
|
if filename in seen:
|
|
dupes.append(filename)
|
|
else:
|
|
seen.add(filename)
|
|
|
|
if len(dupes) > 0:
|
|
print("Dupes Found:", dupes)
|
|
print("Cannot continue using --single-directory")
|
|
print("Please rename or remove duplicates")
|
|
exit()
|
|
|
|
def _transcode_copy(self, file: File):
|
|
src = file.join_path_to()
|
|
dest = file.join_filename(
|
|
) if self.single_directory else file.join_path_from_src()
|
|
dest = self.dest + "/" + dest
|
|
|
|
exists = path_exists(dest)
|
|
|
|
if (self.skip_existing and not exists) or not self.skip_existing:
|
|
print("Copying", src, "to", dest)
|
|
copy_file(
|
|
src,
|
|
dest,
|
|
)
|
|
else:
|
|
print(
|
|
"Skipping",
|
|
src,
|
|
"as already is copied at",
|
|
dest)
|
|
|
|
self.state.transcoded_files.append(file)
|
|
|
|
def _transcode_with_level(self, file: File, level: str):
|
|
trans_config = get_transcode_config(file, level)
|
|
|
|
src = file.join_path_to()
|
|
|
|
new_file = deep_copy(file)
|
|
new_file.extension = trans_config.file_extension
|
|
|
|
dest_filepath = new_file.join_filename(
|
|
) if self.single_directory else new_file.join_path_from_src()
|
|
dest_filepath = self.dest + "/" + dest_filepath
|
|
|
|
if (self.skip_existing and path_exists(dest_filepath)):
|
|
print("Skipping transcoding", dest_filepath)
|
|
self.state.transcoded_files.append(new_file)
|
|
return
|
|
|
|
print("Transcoding", src, "to", dest_filepath)
|
|
|
|
transcode(file, trans_config, dest_filepath)
|
|
|
|
self.state.transcoded_files.append(new_file)
|
|
|
|
def transcode_files(self):
|
|
print("Transcoding Files")
|
|
|
|
if not self.single_directory:
|
|
directories = set()
|
|
for file in self.state.files:
|
|
directories.add(file.path_from_src)
|
|
for dir in directories:
|
|
make_directories(
|
|
self.dest + "/" + dir, exist_ok=True)
|
|
|
|
if self.transcode_level == "copy":
|
|
for file in self.state.files:
|
|
self._transcode_copy(file)
|
|
return
|
|
elif self.transcode_level in transcode_levels_list:
|
|
for file in self.state.files:
|
|
self._transcode_with_level(
|
|
file, self.transcode_level)
|
|
|
|
def create_mappings(self):
|
|
with open(self.dest + "/" + "mappings.txt", "w") as f:
|
|
f.write("\n".join([
|
|
f"{file.path_from_src} <- {file.filename}" for file in self.state.files
|
|
]))
|