format code
This commit is contained in:
parent
5272a3d356
commit
1c57673dc4
27
main.py
27
main.py
|
@ -1,27 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
|
||||
from musicutil.commands.process_command import ProcessCommand
|
||||
from musicutil.commands.copy_command import CopyCommand
|
||||
|
||||
parser = argparse.ArgumentParser(description='Highly Opinionated Music ')
|
||||
subparsers = parser.add_subparsers(dest="subparser_name")
|
||||
|
||||
process_parser = subparsers.add_parser('process')
|
||||
process_parser.add_argument('src', type=str, help='src base music directory')
|
||||
process_parser.add_argument('--dry-run', action='store_true')
|
||||
|
||||
copy_parser = subparsers.add_parser('copy')
|
||||
copy_parser.add_argument('src', type=str, help='src base music directory')
|
||||
copy_parser.add_argument('dest', type=str, help='dest music directory')
|
||||
copy_parser.add_argument('--transcode-level', type=str, help='transcode level', default="copy")
|
||||
copy_parser.add_argument('--skip-existing', action='store_true')
|
||||
copy_parser.add_argument('--single-directory', action='store_true')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.subparser_name == "process":
|
||||
ProcessCommand(args.src, args.dry_run).run()
|
||||
elif args.subparser_name == "copy":
|
||||
CopyCommand(args.src, args.dest, args.transcode_level, args.single_directory, args.skip_existing).run()
|
52
musicutil/__main__.py
Normal file
52
musicutil/__main__.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
|
||||
from .commands.process_command import ProcessCommand
|
||||
from .commands.copy_command import CopyCommand
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="chaos's musicutil")
|
||||
subparsers = parser.add_subparsers(dest="subparser_name")
|
||||
|
||||
process_parser = subparsers.add_parser('process')
|
||||
process_parser.add_argument(
|
||||
'src',
|
||||
type=str,
|
||||
help='src base music directory')
|
||||
process_parser.add_argument(
|
||||
'--dry-run', action='store_true')
|
||||
|
||||
copy_parser = subparsers.add_parser('copy')
|
||||
copy_parser.add_argument(
|
||||
'src',
|
||||
type=str,
|
||||
help='src base music directory')
|
||||
copy_parser.add_argument(
|
||||
'dest',
|
||||
type=str,
|
||||
help='dest music directory')
|
||||
copy_parser.add_argument(
|
||||
'--transcode-level',
|
||||
type=str,
|
||||
help='transcode level',
|
||||
default="copy")
|
||||
copy_parser.add_argument(
|
||||
'--skip-existing',
|
||||
action='store_true')
|
||||
copy_parser.add_argument(
|
||||
'--single-directory',
|
||||
action='store_true')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.subparser_name == "process":
|
||||
ProcessCommand(args.src, args.dry_run).run()
|
||||
elif args.subparser_name == "copy":
|
||||
CopyCommand(
|
||||
args.src,
|
||||
args.dest,
|
||||
args.transcode_level,
|
||||
args.single_directory,
|
||||
args.skip_existing
|
||||
).run()
|
|
@ -8,140 +8,149 @@ from shutil import copy as copy_file
|
|||
from copy import deepcopy as deep_copy
|
||||
from subprocess import run as run_command
|
||||
|
||||
|
||||
class CopyCommandState:
|
||||
files: list[File] = []
|
||||
transcoded_files: list[File] = []
|
||||
files: list[File] = []
|
||||
transcoded_files: list[File] = []
|
||||
|
||||
class CopyCommand():
|
||||
def __init__(self,
|
||||
src: str,
|
||||
dest: str,
|
||||
transcode_level: str,
|
||||
single_directory: bool,
|
||||
skip_existing: bool
|
||||
):
|
||||
self.src = src
|
||||
self.dest = dest
|
||||
self.transcode_level = transcode_level
|
||||
self.single_directory = single_directory
|
||||
self.skip_existing = skip_existing
|
||||
self.state = CopyCommandState()
|
||||
|
||||
def run(self):
|
||||
print("Copying")
|
||||
self.scan_for_music()
|
||||
self.load_tag_information()
|
||||
if self.single_directory:
|
||||
self.check_for_collisions()
|
||||
self.transcode_files()
|
||||
class CopyCommand():
|
||||
def __init__(self,
|
||||
src: str,
|
||||
dest: str,
|
||||
transcode_level: str,
|
||||
single_directory: bool,
|
||||
skip_existing: bool
|
||||
):
|
||||
self.src = src
|
||||
self.dest = dest
|
||||
self.transcode_level = transcode_level
|
||||
self.single_directory = single_directory
|
||||
self.skip_existing = skip_existing
|
||||
self.state = CopyCommandState()
|
||||
|
||||
def scan_for_music(self):
|
||||
print("Scanning For Music")
|
||||
self.state.files = scan_for_music(self.src)
|
||||
def run(self):
|
||||
print("Copying")
|
||||
self.scan_for_music()
|
||||
self.load_tag_information()
|
||||
if self.single_directory:
|
||||
self.check_for_collisions()
|
||||
self.transcode_files()
|
||||
|
||||
def load_tag_information(self):
|
||||
print("Loading Tag Information")
|
||||
def scan_for_music(self):
|
||||
print("Scanning For Music")
|
||||
self.state.files = scan_for_music(self.src)
|
||||
|
||||
for file in self.state.files:
|
||||
file.tags = load_tag_information(file)
|
||||
|
||||
def check_for_collisions(self):
|
||||
print("Checking For Colisions")
|
||||
seen = set()
|
||||
dupes = []
|
||||
def load_tag_information(self):
|
||||
print("Loading Tag Information")
|
||||
|
||||
for file in self.state.files:
|
||||
filename = file.filename
|
||||
if filename in seen:
|
||||
dupes.append(filename)
|
||||
else:
|
||||
seen.add(filename)
|
||||
for file in self.state.files:
|
||||
file.tags = load_tag_information(file)
|
||||
|
||||
if len(dupes) > 0:
|
||||
print("Dupes Found:", dupes)
|
||||
print("Cannot continue using --single-directory")
|
||||
print("Please rename or remove duplicates")
|
||||
exit()
|
||||
def check_for_collisions(self):
|
||||
print("Checking For Colisions")
|
||||
seen = set()
|
||||
dupes = []
|
||||
|
||||
def _transcode_copy(self, file: File):
|
||||
src = file.join_path_to()
|
||||
dest = file.join_filename() if self.single_directory else file.join_path_from_src()
|
||||
dest = self.dest + "/" + dest
|
||||
for file in self.state.files:
|
||||
filename = file.filename
|
||||
if filename in seen:
|
||||
dupes.append(filename)
|
||||
else:
|
||||
seen.add(filename)
|
||||
|
||||
exists = path_exists(dest)
|
||||
if len(dupes) > 0:
|
||||
print("Dupes Found:", dupes)
|
||||
print("Cannot continue using --single-directory")
|
||||
print("Please rename or remove duplicates")
|
||||
exit()
|
||||
|
||||
if (self.skip_existing and not exists) or not self.skip_existing:
|
||||
print("Copying", src, "to", dest)
|
||||
copy_file(
|
||||
src,
|
||||
dest,
|
||||
)
|
||||
else:
|
||||
print("Skipping", src, "as already is copied at", dest)
|
||||
|
||||
self.state.transcoded_files.append(file)
|
||||
def _transcode_copy(self, file: File):
|
||||
src = file.join_path_to()
|
||||
dest = file.join_filename(
|
||||
) if self.single_directory else file.join_path_from_src()
|
||||
dest = self.dest + "/" + dest
|
||||
|
||||
def _transcode_with_level(self, file: File, level: str):
|
||||
transcoded_file_extension = "opus"
|
||||
exists = path_exists(dest)
|
||||
|
||||
src = file.join_path_to()
|
||||
if (self.skip_existing and not exists) or not self.skip_existing:
|
||||
print("Copying", src, "to", dest)
|
||||
copy_file(
|
||||
src,
|
||||
dest,
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"Skipping",
|
||||
src,
|
||||
"as already is copied at",
|
||||
dest)
|
||||
|
||||
new_file = deep_copy(file)
|
||||
new_file.extension = transcoded_file_extension
|
||||
|
||||
dest_filepath = new_file.join_filename() if self.single_directory else new_file.join_path_from_src()
|
||||
dest_filepath = self.dest + "/" + dest_filepath
|
||||
self.state.transcoded_files.append(file)
|
||||
|
||||
if (self.skip_existing and path_exists(dest_filepath)):
|
||||
print("Skipping transcoding", dest_filepath)
|
||||
self.state.transcoded_files.append(new_file)
|
||||
return
|
||||
|
||||
bitrate = ""
|
||||
def _transcode_with_level(self, file: File, level: str):
|
||||
transcoded_file_extension = "opus"
|
||||
|
||||
if self.transcode_level == "high":
|
||||
bitrate = "128K"
|
||||
elif self.transcode_level == "medium":
|
||||
bitrate = "96K"
|
||||
elif self.transcode_level == "low":
|
||||
bitrate = "64K"
|
||||
src = file.join_path_to()
|
||||
|
||||
print("Transcoding", src, "to", dest_filepath)
|
||||
new_file = deep_copy(file)
|
||||
new_file.extension = transcoded_file_extension
|
||||
|
||||
title = file.tags.title
|
||||
artist = file.tags.artist
|
||||
dest_filepath = new_file.join_filename(
|
||||
) if self.single_directory else new_file.join_path_from_src()
|
||||
dest_filepath = self.dest + "/" + dest_filepath
|
||||
|
||||
# TODO: check for errors
|
||||
run_command([
|
||||
"ffmpeg",
|
||||
"-y",
|
||||
"-hide_banner",
|
||||
"-loglevel", "warning",
|
||||
"-i", src,
|
||||
"-c:a", "libopus",
|
||||
"-b:a", bitrate,
|
||||
"-metadata", f"title=\"{title}\"",
|
||||
"-metadata", f"artist=\"{artist}\"",
|
||||
dest_filepath
|
||||
])
|
||||
if (self.skip_existing and path_exists(dest_filepath)):
|
||||
print("Skipping transcoding", dest_filepath)
|
||||
self.state.transcoded_files.append(new_file)
|
||||
return
|
||||
|
||||
self.state.transcoded_files.append(new_file)
|
||||
|
||||
def transcode_files(self):
|
||||
print("Transcoding Files")
|
||||
|
||||
if not self.single_directory:
|
||||
directories = set()
|
||||
for file in self.state.files:
|
||||
directories.add(file.path_from_src)
|
||||
for dir in directories:
|
||||
make_directories(self.dest + "/" + dir, exist_ok=True)
|
||||
bitrate = ""
|
||||
|
||||
if self.transcode_level == "copy":
|
||||
for file in self.state.files:
|
||||
self._transcode_copy(file)
|
||||
return
|
||||
elif self.transcode_level in ["high", "medium", "low"]:
|
||||
for file in self.state.files:
|
||||
self._transcode_with_level(file, self.transcode_level)
|
||||
if self.transcode_level == "high":
|
||||
bitrate = "128K"
|
||||
elif self.transcode_level == "medium":
|
||||
bitrate = "96K"
|
||||
elif self.transcode_level == "low":
|
||||
bitrate = "64K"
|
||||
|
||||
print("Transcoding", src, "to", dest_filepath)
|
||||
|
||||
title = file.tags.title
|
||||
artist = file.tags.artist
|
||||
|
||||
# TODO: check for errors
|
||||
run_command([
|
||||
"ffmpeg",
|
||||
"-y",
|
||||
"-hide_banner",
|
||||
"-loglevel", "warning",
|
||||
"-i", src,
|
||||
"-c:a", "libopus",
|
||||
"-b:a", bitrate,
|
||||
"-metadata", f"title=\"{title}\"",
|
||||
"-metadata", f"artist=\"{artist}\"",
|
||||
dest_filepath
|
||||
])
|
||||
|
||||
self.state.transcoded_files.append(new_file)
|
||||
|
||||
def transcode_files(self):
|
||||
print("Transcoding Files")
|
||||
|
||||
if not self.single_directory:
|
||||
directories = set()
|
||||
for file in self.state.files:
|
||||
directories.add(file.path_from_src)
|
||||
for dir in directories:
|
||||
make_directories(
|
||||
self.dest + "/" + dir, exist_ok=True)
|
||||
|
||||
if self.transcode_level == "copy":
|
||||
for file in self.state.files:
|
||||
self._transcode_copy(file)
|
||||
return
|
||||
elif self.transcode_level in ["high", "medium", "low"]:
|
||||
for file in self.state.files:
|
||||
self._transcode_with_level(
|
||||
file, self.transcode_level)
|
||||
|
|
|
@ -6,57 +6,63 @@ from ..utils.substitutions import reduce_to_ascii_and_substitute
|
|||
from copy import deepcopy as deep_copy
|
||||
from os import rename as rename_file
|
||||
|
||||
|
||||
class ProcessCommandState:
|
||||
files: list[File] = []
|
||||
files: list[File] = []
|
||||
|
||||
class ProcessCommand():
|
||||
def __init__(self, src: str, dry_run: bool):
|
||||
self.src = src
|
||||
self.dry_run = dry_run
|
||||
self.state = ProcessCommandState()
|
||||
|
||||
def run(self):
|
||||
print("Renaming")
|
||||
self.scan_for_music()
|
||||
self.load_tag_information()
|
||||
self.rename_files()
|
||||
class ProcessCommand():
|
||||
def __init__(self, src: str, dry_run: bool):
|
||||
self.src = src
|
||||
self.dry_run = dry_run
|
||||
self.state = ProcessCommandState()
|
||||
|
||||
def scan_for_music(self):
|
||||
print("Scanning For Music")
|
||||
self.state.files = scan_for_music(self.src)
|
||||
def run(self):
|
||||
print("Renaming")
|
||||
self.scan_for_music()
|
||||
self.load_tag_information()
|
||||
self.rename_files()
|
||||
|
||||
def load_tag_information(self):
|
||||
print("Loading Tag Information")
|
||||
def scan_for_music(self):
|
||||
print("Scanning For Music")
|
||||
self.state.files = scan_for_music(self.src)
|
||||
|
||||
for file in self.state.files:
|
||||
tags = load_tag_information(file)
|
||||
file.tags = tags
|
||||
|
||||
def load_tag_information(self):
|
||||
print("Loading Tag Information")
|
||||
|
||||
def _rename_file(self, file: File) -> File:
|
||||
filename = file.filename
|
||||
artist = file.tags.artist.replace("\n", "")
|
||||
title = file.tags.title.replace("\n", "")
|
||||
for file in self.state.files:
|
||||
tags = load_tag_information(file)
|
||||
file.tags = tags
|
||||
|
||||
proper_filename = reduce_to_ascii_and_substitute(f"{artist} - {title}")
|
||||
def _rename_file(self, file: File) -> File:
|
||||
filename = file.filename
|
||||
artist = file.tags.artist.replace("\n", "")
|
||||
title = file.tags.title.replace("\n", "")
|
||||
|
||||
if filename != proper_filename:
|
||||
print(f"Renaming \"{filename}\"", "to" + f"\"{proper_filename}\"" + "\n")
|
||||
proper_filename = reduce_to_ascii_and_substitute(
|
||||
f"{artist} - {title}")
|
||||
|
||||
new_file = deep_copy(file)
|
||||
new_file.filename = proper_filename
|
||||
if filename != proper_filename:
|
||||
print(f"Renaming \"{filename}\"", "to" +
|
||||
f"\"{proper_filename}\"" + "\n")
|
||||
|
||||
if not self.dry_run:
|
||||
rename_file(file.join_path_to(), new_file.join_path_to())
|
||||
# so that other steps after read the new file and not
|
||||
# the orig pre-rename file when not dry run
|
||||
return new_file
|
||||
else:
|
||||
return file
|
||||
new_file = deep_copy(file)
|
||||
new_file.filename = proper_filename
|
||||
|
||||
def rename_files(self):
|
||||
print("Renaming files")
|
||||
if not self.dry_run:
|
||||
rename_file(
|
||||
file.join_path_to(),
|
||||
new_file.join_path_to())
|
||||
# so that other steps after read the new file and not
|
||||
# the orig pre-rename file when not dry run
|
||||
return new_file
|
||||
else:
|
||||
return file
|
||||
|
||||
for file in self.state.files:
|
||||
index = self.state.files.index(file)
|
||||
self.state.files[index] = self._rename_file(file)
|
||||
def rename_files(self):
|
||||
print("Renaming files")
|
||||
|
||||
for file in self.state.files:
|
||||
index = self.state.files.index(file)
|
||||
self.state.files[index] = self._rename_file(
|
||||
file)
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
# All file extensions that are supported and have tag
|
||||
# extraction
|
||||
supported_formats = ["mp3", "flac"]
|
||||
|
||||
sub_char = "_"
|
||||
|
||||
substitutions = {
|
||||
"α": "a",
|
||||
}
|
||||
"α": "a",
|
||||
}
|
||||
|
|
|
@ -1,43 +1,44 @@
|
|||
class Tags:
|
||||
title = ""
|
||||
artist = ""
|
||||
title = ""
|
||||
artist = ""
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"title": self.title,
|
||||
"artist": self.artist
|
||||
}
|
||||
def to_dict(self):
|
||||
return {
|
||||
"title": self.title,
|
||||
"artist": self.artist
|
||||
}
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.to_dict())
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.to_dict())
|
||||
|
||||
class File:
|
||||
filename = ""
|
||||
extension = ""
|
||||
# The path to the actual file
|
||||
path_to = ""
|
||||
# The path relative to the source directory
|
||||
path_from_src = ""
|
||||
filename = ""
|
||||
extension = ""
|
||||
# The path to the actual file
|
||||
path_to = ""
|
||||
# The path relative to the source directory
|
||||
path_from_src = ""
|
||||
|
||||
tags = Tags()
|
||||
tags = Tags()
|
||||
|
||||
def join_filename(self):
|
||||
return f"{self.filename}.{self.extension}"
|
||||
def join_filename(self):
|
||||
return f"{self.filename}.{self.extension}"
|
||||
|
||||
def join_path_to(self):
|
||||
return f"{self.path_to}/{self.filename}.{self.extension}"
|
||||
def join_path_to(self):
|
||||
return f"{self.path_to}/{self.filename}.{self.extension}"
|
||||
|
||||
def join_path_from_src(self):
|
||||
return f"{self.path_from_src}/{self.filename}.{self.extension}"
|
||||
def join_path_from_src(self):
|
||||
return f"{self.path_from_src}/{self.filename}.{self.extension}"
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"filename": self.filename,
|
||||
"extension": self.extension,
|
||||
"path_to": self.path_to,
|
||||
"path_from_src": self.path_from_src,
|
||||
"tags": self.tags.to_dict(),
|
||||
}
|
||||
def to_dict(self):
|
||||
return {
|
||||
"filename": self.filename,
|
||||
"extension": self.extension,
|
||||
"path_to": self.path_to,
|
||||
"path_from_src": self.path_from_src,
|
||||
"tags": self.tags.to_dict(),
|
||||
}
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.to_dict())
|
||||
def __repr__(self):
|
||||
return repr(self.to_dict())
|
||||
|
|
|
@ -3,18 +3,19 @@ from ..types import File, Tags
|
|||
from mutagen.mp3 import EasyMP3 as MP3
|
||||
from mutagen.flac import FLAC
|
||||
|
||||
|
||||
def load_tag_information(file: File) -> Tags:
|
||||
path = file.join_path_to()
|
||||
tags = Tags()
|
||||
if file.extension == "mp3":
|
||||
mp3 = MP3(path)
|
||||
tags.title = mp3["title"][0]
|
||||
tags.artist = mp3["artist"][0]
|
||||
elif file.extension == "flac":
|
||||
flac = FLAC(path)
|
||||
tags.title = flac["title"][0]
|
||||
tags.artist = flac["artist"][0]
|
||||
else:
|
||||
print("Invalid / Unsupported Container")
|
||||
exit()
|
||||
return tags
|
||||
path = file.join_path_to()
|
||||
tags = Tags()
|
||||
if file.extension == "mp3":
|
||||
mp3 = MP3(path)
|
||||
tags.title = mp3["title"][0]
|
||||
tags.artist = mp3["artist"][0]
|
||||
elif file.extension == "flac":
|
||||
flac = FLAC(path)
|
||||
tags.title = flac["title"][0]
|
||||
tags.artist = flac["artist"][0]
|
||||
else:
|
||||
print("Invalid / Unsupported Container")
|
||||
exit()
|
||||
return tags
|
||||
|
|
|
@ -4,14 +4,16 @@ from os.path import relpath
|
|||
from ..types import File
|
||||
from ..meta import supported_formats
|
||||
|
||||
|
||||
def scan_for_music(src: str) -> list[File]:
|
||||
files: list[File] = []
|
||||
for format in supported_formats:
|
||||
for path in Path(src).rglob("*." + format):
|
||||
file = File()
|
||||
file.path_to = str(path.parent)
|
||||
file.path_from_src = relpath(str(path.parent), src)
|
||||
file.filename = path.stem
|
||||
file.extension = path.suffix.replace(".", "")
|
||||
files.append(file)
|
||||
return files
|
||||
files: list[File] = []
|
||||
for format in supported_formats:
|
||||
for path in Path(src).rglob("*." + format):
|
||||
file = File()
|
||||
file.path_to = str(path.parent)
|
||||
file.path_from_src = relpath(
|
||||
str(path.parent), src)
|
||||
file.filename = path.stem
|
||||
file.extension = path.suffix.replace(".", "")
|
||||
files.append(file)
|
||||
return files
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
from ..meta import sub_char, substitutions
|
||||
from fold_to_ascii import fold
|
||||
|
||||
def reduce_to_ascii_and_substitute(filename: str):
|
||||
filename = filename.replace("/", sub_char)
|
||||
filename = filename.replace("\\", sub_char)
|
||||
filename = filename.replace("\n", "")
|
||||
for sub_before in substitutions.keys():
|
||||
filename = filename.replace(sub_before, substitutions[sub_before])
|
||||
filename = fold(filename)
|
||||
return filename
|
||||
|
||||
def reduce_to_ascii_and_substitute(filename: str):
|
||||
filename = filename.replace("/", sub_char)
|
||||
filename = filename.replace("\\", sub_char)
|
||||
filename = filename.replace("\n", "")
|
||||
for sub_before in substitutions.keys():
|
||||
filename = filename.replace(
|
||||
sub_before, substitutions[sub_before])
|
||||
filename = fold(filename)
|
||||
return filename
|
||||
|
|
5
pyproject.toml
Normal file
5
pyproject.toml
Normal file
|
@ -0,0 +1,5 @@
|
|||
[tool.autopep8]
|
||||
max_line_length = 60
|
||||
in-place = true
|
||||
recursive = true
|
||||
aggressive = 3
|
|
@ -2,6 +2,6 @@
|
|||
let
|
||||
fold-to-ascii = (py: py.callPackage ./nix-extra-deps/fold-to-ascii.nix { });
|
||||
my_python = pkgs.python39.withPackages
|
||||
(py: with py; [ py.mutagen (fold-to-ascii py) ]);
|
||||
(py: with py; [ py.mutagen (fold-to-ascii py) py.autopep8 ]);
|
||||
|
||||
in pkgs.mkShell { packages = with pkgs; [ my_python ffmpeg ]; }
|
||||
|
|
Loading…
Reference in a new issue