148 lines
3.9 KiB
Python
148 lines
3.9 KiB
Python
![]() |
from .types import File
|
||
|
from .scan_for_music import scan_for_music
|
||
|
from .load_tag_information import load_tag_information
|
||
|
|
||
|
from os import makedirs as make_directories
|
||
|
from os.path import exists as path_exists
|
||
|
from shutil import copy as copy_file
|
||
|
from copy import deepcopy as deep_copy
|
||
|
from subprocess import run as run_command
|
||
|
|
||
|
class CopyCommandState:
|
||
|
files: list[File] = []
|
||
|
transcoded_files: list[File] = []
|
||
|
|
||
|
class CopyCommand():
|
||
|
def __init__(self,
|
||
|
src: str,
|
||
|
dest: str,
|
||
|
transcode_level: str,
|
||
|
single_directory: bool,
|
||
|
skip_existing: bool
|
||
|
):
|
||
|
self.src = src
|
||
|
self.dest = dest
|
||
|
self.transcode_level = transcode_level
|
||
|
self.single_directory = single_directory
|
||
|
self.skip_existing = skip_existing
|
||
|
self.state = CopyCommandState()
|
||
|
|
||
|
def run(self):
|
||
|
print("Copying")
|
||
|
self.scan_for_music()
|
||
|
self.load_tag_information()
|
||
|
if self.single_directory:
|
||
|
self.check_for_collisions()
|
||
|
self.transcode_files()
|
||
|
|
||
|
def scan_for_music(self):
|
||
|
print("Scanning For Music")
|
||
|
self.state.files = scan_for_music(self.src)
|
||
|
|
||
|
def load_tag_information(self):
|
||
|
print("Loading Tag Information")
|
||
|
|
||
|
for file in self.state.files:
|
||
|
file.tags = load_tag_information(file)
|
||
|
|
||
|
def check_for_collisions(self):
|
||
|
print("Checking For Colisions")
|
||
|
seen = set()
|
||
|
dupes = []
|
||
|
|
||
|
for file in self.state.files:
|
||
|
filename = file.filename
|
||
|
if filename in seen:
|
||
|
dupes.append(filename)
|
||
|
else:
|
||
|
seen.add(filename)
|
||
|
|
||
|
if len(dupes) > 0:
|
||
|
print("Dupes Found:", dupes)
|
||
|
print("Cannot continue using --single-directory")
|
||
|
print("Please rename or remove duplicates")
|
||
|
exit()
|
||
|
|
||
|
def _transcode_copy(self, file: File):
|
||
|
src = file.join_path_to()
|
||
|
dest = file.join_filename() if self.single_directory else file.join_path_from_src()
|
||
|
dest = self.dest + "/" + dest
|
||
|
|
||
|
exists = path_exists(dest)
|
||
|
|
||
|
if (self.skip_existing and not exists) or not self.skip_existing:
|
||
|
print("Copying", src, "to", dest)
|
||
|
copy_file(
|
||
|
src,
|
||
|
dest,
|
||
|
)
|
||
|
else:
|
||
|
print("Skipping", src, "as already is copied at", dest)
|
||
|
|
||
|
self.state.transcoded_files.append(file)
|
||
|
|
||
|
def _transcode_with_level(self, file: File, level: str):
|
||
|
transcoded_file_extension = "opus"
|
||
|
|
||
|
src = file.join_path_to()
|
||
|
|
||
|
new_file = deep_copy(file)
|
||
|
new_file.extension = transcoded_file_extension
|
||
|
|
||
|
dest_filepath = new_file.join_filename() if self.single_directory else new_file.join_path_from_src()
|
||
|
dest_filepath = self.dest + "/" + dest_filepath
|
||
|
|
||
|
if (self.skip_existing and path_exists(dest_filepath)):
|
||
|
print("Skipping transcoding", dest_filepath)
|
||
|
self.state.transcoded_files.append(new_file)
|
||
|
return
|
||
|
|
||
|
bitrate = ""
|
||
|
|
||
|
if self.transcode_level == "high":
|
||
|
bitrate = "128K"
|
||
|
elif self.transcode_level == "medium":
|
||
|
bitrate = "96K"
|
||
|
elif self.transcode_level == "low":
|
||
|
bitrate = "64K"
|
||
|
|
||
|
print("Transcoding", src, "to", dest_filepath)
|
||
|
|
||
|
title = file.tags.title
|
||
|
artist = file.tags.artist
|
||
|
|
||
|
# TODO: check for errors
|
||
|
run_command([
|
||
|
"ffmpeg",
|
||
|
"-y",
|
||
|
"-hide_banner",
|
||
|
"-loglevel", "warning",
|
||
|
"-i", src,
|
||
|
"-c:a", "libopus",
|
||
|
"-b:a", bitrate,
|
||
|
"-metadata", f"title=\"{title}\"",
|
||
|
"-metadata", f"artist=\"{artist}\"",
|
||
|
dest_filepath
|
||
|
])
|
||
|
|
||
|
self.state.transcoded_files.append(new_file)
|
||
|
|
||
|
def transcode_files(self):
|
||
|
print("Transcoding Files")
|
||
|
|
||
|
if not self.single_directory:
|
||
|
directories = set()
|
||
|
for file in self.state.files:
|
||
|
directories.add(file.path_from_src)
|
||
|
for dir in directories:
|
||
|
make_directories(self.dest + "/" + dir, exist_ok=True)
|
||
|
|
||
|
if self.transcode_level == "copy":
|
||
|
for file in self.state.files:
|
||
|
self._transcode_copy(file)
|
||
|
return
|
||
|
elif self.transcode_level in ["high", "medium", "low"]:
|
||
|
for file in self.state.files:
|
||
|
self._transcode_with_level(file, self.transcode_level)
|
||
|
|