From da910912f01ef4da06d2c09b3a4f252008b85b7e Mon Sep 17 00:00:00 2001 From: ChaotiCryptidz Date: Sat, 22 Oct 2022 14:35:31 +0100 Subject: [PATCH] tidy progress_monitor to file --- src/utils/transcoder/mod.rs | 4 +- src/utils/transcoder/progress_monitor.rs | 135 ++++++++++++++++++++ src/utils/transcoder/transcoder.rs | 155 +++-------------------- 3 files changed, 153 insertions(+), 141 deletions(-) create mode 100644 src/utils/transcoder/progress_monitor.rs diff --git a/src/utils/transcoder/mod.rs b/src/utils/transcoder/mod.rs index 82bd1fd..d6e58f7 100644 --- a/src/utils/transcoder/mod.rs +++ b/src/utils/transcoder/mod.rs @@ -1,4 +1,6 @@ pub mod presets; pub mod types; +pub mod progress_monitor; #[allow(clippy::all)] mod transcoder; -pub use self::transcoder::transcode; \ No newline at end of file +pub use self::transcoder::transcode; +pub(self) use self::progress_monitor::progress_monitor; \ No newline at end of file diff --git a/src/utils/transcoder/progress_monitor.rs b/src/utils/transcoder/progress_monitor.rs new file mode 100644 index 0000000..8fa6919 --- /dev/null +++ b/src/utils/transcoder/progress_monitor.rs @@ -0,0 +1,135 @@ +use std::{ + fs, + io::{BufRead, BufReader, Seek, SeekFrom}, + path::PathBuf, + process::Command, + sync::mpsc::{self, Sender}, + thread::{self, JoinHandle}, + time::Duration, +}; + +use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; +use serde::Deserialize; +use string_error::static_err; + + +#[derive(Debug, Clone, Deserialize)] +struct FFProbeOutput { + pub format: FFProbeFormat, +} + +#[derive(Debug, Clone, Deserialize)] +struct FFProbeFormat { + pub duration: String, +} + +fn get_file_length_milliseconds( + source_filepath: PathBuf, +) -> Result> { + let output = Command::new(crate::meta::FFPROBE) + .args([ + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + &source_filepath.to_string_lossy(), + ]) + .output()?; + + if !output.status.success() { + print!("{:?}", String::from_utf8(output.stderr).unwrap()); + return Err(static_err("FFprobe Crashed")); + } + + let output_str = String::from_utf8(output.stdout).unwrap(); + let ffprobe_out: FFProbeOutput = serde_json::from_str(output_str.as_str())?; + + let duration_seconds = ffprobe_out.format.duration.parse::()?; + + Ok((duration_seconds * 1000.0).round() as u64) +} + +fn ffprobe_duration_to_ms(duration: String) -> Result> { + let fields: Vec<&str> = duration.split(':').collect(); + let mut duration = Duration::from_nanos(0); + + let hours = fields[0].parse::()?; + duration += Duration::from_secs(hours * 60 * 60); + + let minutes = fields[1].parse::()?; + duration += Duration::from_secs(minutes * 60); + + let seconds = fields[1].parse::()?; + duration += Duration::from_millis((seconds * 1000.0) as u64); + + Ok(duration.as_millis() as u64) +} + +pub fn progress_monitor( + source_filepath: PathBuf, + sender_base: &Sender, +) -> Result<(String, JoinHandle<()>), Box> { + let total_length_millis = get_file_length_milliseconds(source_filepath)?; + + let tempdir = tempfile::tempdir()?; + let file_path = tempdir.path().join("progress.log"); + let file_path_string = file_path.to_str().unwrap().to_string(); + fs::File::create(&file_path)?; + + let sender = sender_base.clone(); + let child = thread::spawn(move || { + let _ = &tempdir; + + let (tx, rx) = mpsc::channel(); + let mut watcher = watcher(tx, Duration::from_millis(100)).unwrap(); + watcher + .watch(&file_path, RecursiveMode::NonRecursive) + .unwrap(); + + let mut pos = 0; + + 'outer: loop { + match rx.recv() { + Ok(DebouncedEvent::Write(_)) => { + let mut file = fs::File::open(&file_path).unwrap(); + file.seek(SeekFrom::Start(pos)).unwrap(); + + pos = file.metadata().unwrap().len(); + + let reader = BufReader::new(file); + for line in reader.lines() { + let ln = line.unwrap(); + + if ln == "progress=end" { + break 'outer; + } + + if ln.starts_with("out_time=") { + let out_time = ln.strip_prefix("out_time=").unwrap().to_string(); + let out_time_ms = ffprobe_duration_to_ms(out_time).unwrap(); + if sender + .send(format!( + "{:.2}%", + ((out_time_ms as f64 / total_length_millis as f64) * 100.0) + )) + .is_err() + { + break 'outer; + }; + } + } + } + Ok(DebouncedEvent::NoticeRemove(_)) => { + break 'outer; + } + Ok(_) => {} + Err(_) => { + break 'outer; + } + } + } + }); + + Ok((file_path_string, child)) +} \ No newline at end of file diff --git a/src/utils/transcoder/transcoder.rs b/src/utils/transcoder/transcoder.rs index 94e9423..ab73d54 100644 --- a/src/utils/transcoder/transcoder.rs +++ b/src/utils/transcoder/transcoder.rs @@ -1,141 +1,16 @@ use std::{ fs, - io::{BufRead, BufReader, Seek, SeekFrom}, - path::PathBuf, process::Command, - sync::mpsc::{self, Sender}, - thread::{self, JoinHandle}, - time::Duration, + sync::mpsc::{Sender}, + thread::{JoinHandle}, }; -use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; use crate::types::File; -use serde::Deserialize; use string_error::static_err; -use super::types::TranscodeConfig; +use super::{progress_monitor, types::TranscodeConfig}; -#[derive(Debug, Clone, Deserialize)] -struct FFProbeOutput { - pub format: FFProbeFormat, -} - -#[derive(Debug, Clone, Deserialize)] -struct FFProbeFormat { - pub duration: String, -} - -fn get_file_length_milliseconds( - source_filepath: PathBuf, -) -> Result> { - let output = Command::new(crate::meta::FFPROBE) - .args([ - "-v", - "quiet", - "-print_format", - "json", - "-show_format", - &source_filepath.to_string_lossy(), - ]) - .output()?; - - if !output.status.success() { - print!("{:?}", String::from_utf8(output.stderr).unwrap()); - return Err(static_err("FFprobe Crashed")); - } - - let output_str = String::from_utf8(output.stdout).unwrap(); - let ffprobe_out: FFProbeOutput = serde_json::from_str(output_str.as_str())?; - - let duration_seconds = ffprobe_out.format.duration.parse::()?; - - Ok((duration_seconds * 1000.0).round() as u64) -} - -fn ffprobe_duration_to_ms(duration: String) -> Result> { - let fields: Vec<&str> = duration.split(':').collect(); - let mut duration = Duration::from_nanos(0); - - let hours = fields[0].parse::()?; - duration += Duration::from_secs(hours * 60 * 60); - - let minutes = fields[1].parse::()?; - duration += Duration::from_secs(minutes * 60); - - let seconds = fields[1].parse::()?; - duration += Duration::from_millis((seconds * 1000.0) as u64); - - Ok(duration.as_millis() as u64) -} - -fn progress_monitor( - source_filepath: PathBuf, - sender_base: &Sender, -) -> Result<(String, JoinHandle<()>), Box> { - let total_length_millis = get_file_length_milliseconds(source_filepath)?; - - let tempdir = tempfile::tempdir()?; - let file_path = tempdir.path().join("progress.log"); - let file_path_string = file_path.to_str().unwrap().to_string(); - fs::File::create(&file_path)?; - - let sender = sender_base.clone(); - let child = thread::spawn(move || { - let _ = &tempdir; - - let (tx, rx) = mpsc::channel(); - let mut watcher = watcher(tx, Duration::from_millis(100)).unwrap(); - watcher - .watch(&file_path, RecursiveMode::NonRecursive) - .unwrap(); - - let mut pos = 0; - - 'outer: loop { - match rx.recv() { - Ok(DebouncedEvent::Write(_)) => { - let mut file = fs::File::open(&file_path).unwrap(); - file.seek(SeekFrom::Start(pos)).unwrap(); - - pos = file.metadata().unwrap().len(); - - let reader = BufReader::new(file); - for line in reader.lines() { - let ln = line.unwrap(); - - if ln == "progress=end" { - break 'outer; - } - - if ln.starts_with("out_time=") { - let out_time = ln.strip_prefix("out_time=").unwrap().to_string(); - let out_time_ms = ffprobe_duration_to_ms(out_time).unwrap(); - if sender - .send(format!( - "{:.2}%", - ((out_time_ms as f64 / total_length_millis as f64) * 100.0) - )) - .is_err() - { - break 'outer; - }; - } - } - } - Ok(DebouncedEvent::NoticeRemove(_)) => { - break 'outer; - } - Ok(_) => {} - Err(_) => { - break 'outer; - } - } - } - }); - - Ok((file_path_string, child)) -} pub fn transcode( file: File, @@ -151,28 +26,28 @@ pub fn transcode( file.join_path_to().to_string_lossy().to_string(), ]); - if config.encoder.is_some() { - command_args.extend(vec!["-c:a".to_string(), config.encoder.unwrap()]); + if let Some(encoder) = config.encoder { + command_args.extend(vec!["-c:a".to_string(), encoder]); } - if config.container.is_some() { - command_args.extend(vec!["-f".to_string(), config.container.unwrap()]); + if let Some(container) = config.container { + command_args.extend(vec!["-f".to_string(), container]); } - if config.sample_rate.is_some() { - command_args.extend(vec!["-ar".to_string(), config.sample_rate.unwrap()]); + if let Some(sample_rate) = config.sample_rate { + command_args.extend(vec!["-ar".to_string(), sample_rate]); } - if config.channels.is_some() { - command_args.extend(vec!["-ac".to_string(), config.channels.unwrap()]); + if let Some(channels) = config.channels { + command_args.extend(vec!["-ac".to_string(), channels]); } - if config.quality.is_some() { - command_args.extend(vec!["-q:a".to_string(), config.quality.unwrap()]); + if let Some(quality) = config.quality { + command_args.extend(vec!["-q:a".to_string(), quality]); } - if config.bitrate.is_some() { - command_args.extend(vec!["-b:a".to_string(), config.bitrate.unwrap()]); + if let Some(bitrate) = config.bitrate { + command_args.extend(vec!["-b:a".to_string(), bitrate]); } command_args.push(dest);