blob: f539388c79273e445554a112f3890bb67757ab81 [file] [log] [blame]
use avm_analyzer_common::{AvmStreamInfo, DecodeProgress, DecodeState, ProgressResponse, StartDecodeResponse};
use egui::DroppedFile;
use log::{info, warn};
use rand::distributions::Alphanumeric;
use rand::Rng;
use rfd::AsyncFileDialog;
use std::collections::HashMap;
use std::io::{self, Cursor, Write};
use std::sync::{Arc, Mutex};
use wasm_bindgen_futures::spawn_local;
use web_time::Instant;
use crate::stream::{stream_name_from_file_name, Stream};
use super::HttpStreamManager;
pub const HTTP_POLL_PERIOD_SECONDS: f32 = 1.0;
pub const MAX_RETRIES: i32 = 5;
#[derive(Clone)]
pub struct PendingServerDecode {
pub stream_info: AvmStreamInfo,
pub state: DecodeState,
pub start_time: Instant,
pub started_loading_first_frame: bool,
// For retries:
pub bytes: Arc<[u8]>,
pub retry_count: i32,
}
impl PendingServerDecode {
fn new(name: &str, state: DecodeState, bytes: Arc<[u8]>, retries_left: i32) -> Self {
let stream_info = AvmStreamInfo {
stream_name: name.into(),
proto_path_template: "".into(),
num_frames: 0,
thumbnail_png: None,
};
Self {
stream_info,
state,
start_time: Instant::now(),
started_loading_first_frame: false,
bytes,
retry_count: retries_left,
}
}
}
pub struct ServerDecodeManager {
pub pending_decodes: Arc<Mutex<HashMap<String, PendingServerDecode>>>,
last_check_request: Instant,
}
impl ServerDecodeManager {
pub fn new() -> Self {
Self {
pending_decodes: Arc::new(Mutex::new(HashMap::new())),
last_check_request: Instant::now(),
}
}
pub fn prompt_stream(&self) {
let task = AsyncFileDialog::new()
.add_filter("AVM Stream", &["bin", "ivf", "obu"])
.pick_file();
let pending_decodes = self.pending_decodes.clone();
spawn_local(async move {
let file = task.await;
if let Some(file) = file {
let bytes = file.read().await;
upload(file.file_name().as_str(), bytes.into(), pending_decodes, MAX_RETRIES);
}
});
}
pub fn handle_dropped_file(&self, file: DroppedFile) {
let pending_decodes = self.pending_decodes.clone();
if let Some(bytes) = file.bytes {
spawn_local(async move {
upload(file.name.as_str(), bytes, pending_decodes, MAX_RETRIES);
});
}
}
pub fn check_newly_confirmed_uploads(&mut self, stream: &mut Option<Stream>, streams_url: &str) {
let mut pending_decodes = self.pending_decodes.lock().unwrap();
for (_name, pending_decode) in pending_decodes.iter_mut() {
if let DecodeState::UploadComplete = pending_decode.state {
*stream = Some(Stream::from_http(pending_decode.stream_info.clone(), true, 0, streams_url).unwrap());
pending_decode.state = DecodeState::Pending(DecodeProgress {
decoded_frames: 0,
total_frames: pending_decode.stream_info.num_frames,
});
}
}
}
fn check_for_retries(&mut self) {
let mut retries = Vec::new();
{
let mut pending_decodes = self.pending_decodes.lock().unwrap();
for (_name, pending_decode) in pending_decodes.iter_mut() {
if matches!(pending_decode.state, DecodeState::Failed) {
retries.push(pending_decode.clone());
}
}
}
for retry in retries {
if retry.retry_count > 0 {
warn!("Retrying stream: {}. Retries remaining: {}", retry.stream_info.stream_name, retry.retry_count - 1);
upload(
&retry.stream_info.stream_name,
retry.bytes,
self.pending_decodes.clone(),
retry.retry_count - 1
);
}
else {
warn!("Stream: {} failed after {} retries.", retry.stream_info.stream_name, MAX_RETRIES);
}
}
}
pub fn check_progress(&mut self, stream: &mut Option<Stream>, http_manager: &mut HttpStreamManager, streams_url: &str) {
self.check_newly_confirmed_uploads(stream, streams_url);
self.check_for_retries();
let now = Instant::now();
let elapsed = now - self.last_check_request;
if elapsed.as_secs_f32() < HTTP_POLL_PERIOD_SECONDS {
return;
}
self.last_check_request = now;
let mut pending_decodes = self.pending_decodes.lock().unwrap();
for (_name, pending_decode) in pending_decodes.iter() {
if let DecodeState::Pending(_) = &pending_decode.state {
let request = ehttp::Request::get(format!(
"/progress?stream_name={}",
pending_decode.stream_info.stream_name
));
let inner_pending_decodes = self.pending_decodes.clone();
ehttp::fetch(request, move |response| {
if let Ok(response) = response {
if let Some(json) = response.text() {
if let Ok(response) = serde_json::from_str::<ProgressResponse>(json) {
info!("Progress: {response:?}");
let mut pending_decodes = inner_pending_decodes.lock().unwrap();
if let Some(pending_decode) = pending_decodes.get_mut(&response.stream_name) {
pending_decode.state = response.state;
} else {
warn!(
"Received decode progress update for unknown stream: {}.",
response.stream_name
);
}
}
}
}
});
}
}
if let Some(stream) = stream {
// TODO(comc): Move this logic elsewhere.
http_manager.update_pending_decodes(&pending_decodes);
stream.update_pending_decodes(&mut pending_decodes);
}
}
}
const MULTIPART_FORM_BOUNDARY_PREFIX: &str = "--------";
const MULTIPART_FORM_BOUNDARY_LEN: usize = 16;
fn random_boundary() -> String {
let alphanum: String = rand::thread_rng()
.sample_iter(Alphanumeric)
.take(MULTIPART_FORM_BOUNDARY_LEN)
.map(char::from)
.collect();
format!("{MULTIPART_FORM_BOUNDARY_PREFIX}{alphanum}")
}
fn create_upload_request(file_name: &str, bytes: Arc<[u8]>) -> Result<ehttp::Request, io::Error> {
let stream_name = stream_name_from_file_name(file_name);
let mut request_body = Vec::new();
let mut cursor = Cursor::new(&mut request_body);
let boundary = random_boundary();
write!(cursor, "--{}\r\n", boundary)?;
write!(
cursor,
"Content-Disposition: form-data; name=\"{stream_name}\"; filename=\"{file_name}\""
)?;
write!(cursor, "\r\nContent-Type: {}", mime::APPLICATION_OCTET_STREAM)?;
write!(cursor, "\r\n\r\n")?;
cursor.write_all(&bytes)?;
write!(cursor, "\r\n")?;
write!(cursor, "--{}--\r\n", boundary)?;
let content_type = format!("multipart/form-data; boundary={}", boundary);
Ok(ehttp::Request {
method: "POST".into(),
url: "/upload".into(),
body: request_body,
headers: ehttp::headers(&[("Accept", "*/*"), ("Content-Type", &content_type)]),
})
}
// TODO(comc): Handle duplicate stream names.
pub fn upload(file_name: &str, bytes: Arc<[u8]>, pending_decodes: Arc<Mutex<HashMap<String, PendingServerDecode>>>, retries_left: i32) {
let stream_name = stream_name_from_file_name(file_name);
{
let mut pending_decodes = pending_decodes.lock().unwrap();
if let Some(existing_decode) = pending_decodes.get(&stream_name) {
match existing_decode.state {
DecodeState::Pending(_) | DecodeState::UploadComplete | DecodeState::Uploading => {
warn!("Decode request for stream {stream_name} already pending.");
return;
}
_ => {}
}
}
let new_decode = PendingServerDecode::new(&stream_name, DecodeState::Uploading, bytes.clone(), retries_left);
pending_decodes.insert(stream_name.clone(), new_decode);
}
let req = create_upload_request(file_name, bytes).unwrap();
// TODO(comc): Automatic retry on timeout.
ehttp::fetch(req, move |response| {
let mut decode_state = DecodeState::Failed;
let mut updated_stream_info = None;
if let Ok(response) = response {
if let Some(json) = response.text() {
info!("Got response: {}", json);
if let Ok(response) = serde_json::from_str::<StartDecodeResponse>(json) {
updated_stream_info = Some(response.stream_info);
decode_state = DecodeState::UploadComplete;
}
}
}
let mut pending_decodes = pending_decodes.lock().unwrap();
if let Some(pending_decode) = pending_decodes.get_mut(&stream_name) {
pending_decode.state = decode_state;
if let Some(updated_stream_info) = updated_stream_info {
pending_decode.stream_info = updated_stream_info;
}
} else {
warn!("Received status response for {stream_name}, but it no longer exists.");
}
});
}