From a3016be63cfdb530f046d40453c55adfc7cc30c8 Mon Sep 17 00:00:00 2001 From: Ericson Soares Date: Mon, 29 Apr 2024 18:47:58 -0300 Subject: [PATCH] Media data extraction task --- Cargo.lock | 1 + Cargo.toml | 42 +-- core/crates/heavy-lifting/Cargo.toml | 1 + core/crates/heavy-lifting/src/lib.rs | 2 + .../heavy-lifting/src/media_processor/mod.rs | 18 + .../tasks/media_data_extractor.rs | 332 +++++++++++++++++- 6 files changed, 371 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d4e2afbd3e3..cb0d5359bb97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8641,6 +8641,7 @@ dependencies = [ "globset", "itertools 0.12.0", "lending-stream", + "once_cell", "prisma-client-rust", "rmp-serde", "rmpv", diff --git a/Cargo.toml b/Cargo.toml index 309aa5a98a4e..c481b5e57039 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,17 +1,17 @@ [workspace] resolver = "2" members = [ - "core", - "core/crates/*", - "crates/*", - "apps/cli", - "apps/p2p-relay", - "apps/desktop/src-tauri", - "apps/desktop/crates/*", - "apps/mobile/modules/sd-core/core", - "apps/mobile/modules/sd-core/android/crate", - "apps/mobile/modules/sd-core/ios/crate", - "apps/server", + "core", + "core/crates/*", + "crates/*", + "apps/cli", + "apps/p2p-relay", + "apps/desktop/src-tauri", + "apps/desktop/crates/*", + "apps/mobile/modules/sd-core/core", + "apps/mobile/modules/sd-core/android/crate", + "apps/mobile/modules/sd-core/ios/crate", + "apps/server", ] [workspace.package] @@ -22,19 +22,19 @@ repository = "https://github.com/spacedriveapp/spacedrive" [workspace.dependencies] # First party dependencies prisma-client-rust = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [ - "specta", - "sqlite-create-many", - "migrations", - "sqlite", + "specta", + "sqlite-create-many", + "migrations", + "sqlite", ], default-features = false } prisma-client-rust-cli = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [ - "specta", - "sqlite-create-many", - "migrations", - "sqlite", + "specta", + "sqlite-create-many", + "migrations", + "sqlite", ], default-features = false } prisma-client-rust-sdk = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [ - "sqlite", + "sqlite", ], default-features = false } tracing = "0.1.40" @@ -66,7 +66,7 @@ image = "0.24.7" itertools = "0.12.0" lending-stream = "1.0.0" normpath = "1.1.1" -once_cell = "1.18.0" +once_cell = "1.19.0" pin-project-lite = "0.2.13" rand = "0.8.5" rand_chacha = "0.3.1" diff --git a/core/crates/heavy-lifting/Cargo.toml b/core/crates/heavy-lifting/Cargo.toml index a9938a4fa19b..ed609ed78e34 100644 --- a/core/crates/heavy-lifting/Cargo.toml +++ b/core/crates/heavy-lifting/Cargo.toml @@ -32,6 +32,7 @@ futures-concurrency = { workspace = true } globset = { workspace = true } itertools = { workspace = true } lending-stream = { workspace = true } +once_cell = { workspace = true } prisma-client-rust = { workspace = true } rmp-serde = { workspace = true } rmpv = { workspace = true } diff --git a/core/crates/heavy-lifting/src/lib.rs b/core/crates/heavy-lifting/src/lib.rs index 935e65e0bfd6..e98594258705 100644 --- a/core/crates/heavy-lifting/src/lib.rs +++ b/core/crates/heavy-lifting/src/lib.rs @@ -77,6 +77,8 @@ pub enum NonCriticalError { Indexer(#[from] indexer::NonCriticalError), #[error(transparent)] FileIdentifier(#[from] file_identifier::NonCriticalError), + #[error(transparent)] + MediaProcessor(#[from] media_processor::NonCriticalError), } #[repr(i32)] diff --git a/core/crates/heavy-lifting/src/media_processor/mod.rs b/core/crates/heavy-lifting/src/media_processor/mod.rs index f7de0e5095d5..a781630b715c 100644 --- a/core/crates/heavy-lifting/src/media_processor/mod.rs +++ b/core/crates/heavy-lifting/src/media_processor/mod.rs @@ -1,5 +1,11 @@ +use sd_prisma::prisma::file_path; use sd_utils::error::FileIOError; +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; +use specta::Type; + mod tasks; #[derive(thiserror::Error, Debug)] @@ -18,3 +24,15 @@ impl From for rspc::Error { Self::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e) } } + +#[derive(thiserror::Error, Debug, Serialize, Deserialize, Type)] +pub enum NonCriticalError { + #[error("failed to extract media data from : {1}", .0.display())] + FailedToExtractImageMediaData(PathBuf, String), + #[error("processing thread panicked while extracting media data from : {1}", .0.display())] + PanicWhileExtractingImageMediaData(PathBuf, String), + #[error("file path missing object id: ")] + FilePathMissingObjectId(file_path::id::Type), + #[error("failed to construct isolated file path data: : {1}")] + FailedToConstructIsolatedFilePathData(file_path::id::Type, String), +} diff --git a/core/crates/heavy-lifting/src/media_processor/tasks/media_data_extractor.rs b/core/crates/heavy-lifting/src/media_processor/tasks/media_data_extractor.rs index c5c089f9dbc2..ac32f941f405 100644 --- a/core/crates/heavy-lifting/src/media_processor/tasks/media_data_extractor.rs +++ b/core/crates/heavy-lifting/src/media_processor/tasks/media_data_extractor.rs @@ -1,18 +1,115 @@ -use crate::Error; +use crate::{media_processor, Error, NonCriticalError}; -use sd_task_system::{ExecStatus, Interrupter, Task, TaskId}; +use sd_core_file_path_helper::IsolatedFilePathData; +use sd_core_prisma_helpers::file_path_for_media_processor; + +use sd_file_ext::extensions::{Extension, ImageExtension, ALL_IMAGE_EXTENSIONS}; +use sd_media_metadata::ImageMetadata; +use sd_prisma::prisma::{file_path, location, media_data, object, PrismaClient}; +use sd_task_system::{ + check_interruption, ExecStatus, Interrupter, IntoAnyTaskOutput, Task, TaskId, +}; + +use std::{ + collections::{HashMap, HashSet}, + mem, + path::{Path, PathBuf}, + pin::pin, + sync::Arc, + time::Duration, +}; + +use futures::StreamExt; +use futures_concurrency::future::FutureGroup; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; +use tokio::{task::spawn_blocking, time::Instant}; + +use super::media_data_image_to_query; #[derive(Debug)] pub struct MediaDataExtractor { id: TaskId, + file_paths: Vec, + location_id: location::id::Type, + location_path: Arc, + stage: Stage, + errors: Vec, + db: Arc, + output: Output, + is_shallow: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +enum Stage { + Starting, + FetchedObjectsAlreadyWithMediaData(Vec), + ExtractingMediaData { + paths_by_id: HashMap, object::id::Type)>, + // TODO: Change to support any kind of media data, not only images + media_datas: Vec<(ImageMetadata, object::id::Type)>, + }, + SaveMediaData { + media_datas: Vec<(ImageMetadata, object::id::Type)>, + }, } impl MediaDataExtractor { - pub fn new() -> Self { + fn new( + file_paths: &[file_path_for_media_processor::Data], + location_id: location::id::Type, + location_path: Arc, + db: Arc, + is_shallow: bool, + ) -> Self { + let mut errors = Vec::new(); + Self { id: TaskId::new_v4(), + file_paths: file_paths + .iter() + .filter(|file_path| { + if file_path.object_id.is_some() { + true + } else { + errors.push( + media_processor::NonCriticalError::FilePathMissingObjectId( + file_path.id, + ) + .into(), + ); + false + } + }) + .cloned() + .collect(), + location_id, + location_path, + stage: Stage::Starting, + errors, + db, + output: Output::default(), + is_shallow, } } + + pub fn new_deep( + file_paths: &[file_path_for_media_processor::Data], + location_id: location::id::Type, + location_path: Arc, + db: Arc, + ) -> Self { + Self::new(file_paths, location_id, location_path, db, false) + } + + pub fn new_shallow( + file_paths: &[file_path_for_media_processor::Data], + location_id: location::id::Type, + location_path: Arc, + db: Arc, + ) -> Self { + Self::new(file_paths, location_id, location_path, db, true) + } } #[async_trait::async_trait] @@ -21,7 +118,234 @@ impl Task for MediaDataExtractor { self.id } + fn with_priority(&self) -> bool { + self.is_shallow + } + async fn run(&mut self, interrupter: &Interrupter) -> Result { - Ok(ExecStatus::Canceled) + let Self { + file_paths, + location_id, + location_path, + stage, + errors, + db, + output, + .. + } = self; + + loop { + match stage { + Stage::Starting => { + let db_read_start = Instant::now(); + let object_ids = fetch_objects_already_with_media_data(file_paths, db).await?; + output.db_read_time = db_read_start.elapsed(); + + *stage = Stage::FetchedObjectsAlreadyWithMediaData(object_ids); + } + + Stage::FetchedObjectsAlreadyWithMediaData(objects_already_with_media_data) => { + let filtering_start = Instant::now(); + if file_paths.len() == objects_already_with_media_data.len() { + // All files already have media data, skipping + #[allow(clippy::cast_possible_truncation)] + { + // SAFETY: we shouldn't have more than 4 billion unique objects already with media data + output.skipped = file_paths.len() as u32; + } + break; + } + + let unique_objects_already_with_media_data = + mem::take(objects_already_with_media_data) + .into_iter() + .collect::>(); + + #[allow(clippy::cast_possible_truncation)] + { + // SAFETY: we shouldn't have more than 4 billion unique objects already with media data + output.skipped = unique_objects_already_with_media_data.len() as u32; + } + + file_paths.retain(|file_path| { + !unique_objects_already_with_media_data + .contains(&file_path.object_id.expect("already checked")) + }); + + let paths_by_id = file_paths.iter().filter_map(|file_path| { + IsolatedFilePathData::try_from((*location_id, file_path)) + .map_err(|e| errors.push(media_processor::NonCriticalError::FailedToConstructIsolatedFilePathData(file_path.id, e.to_string()).into())) + .map(|iso_file_path| { + (file_path.id, (Arc::new(location_path.join(iso_file_path)), file_path.object_id.expect("already checked"))) + }).ok() + }).collect(); + + output.filtering_time = filtering_start.elapsed(); + + *stage = Stage::ExtractingMediaData { + paths_by_id, + media_datas: Vec::new(), + }; + } + + Stage::ExtractingMediaData { + paths_by_id, + media_datas, + } => { + let extraction_start = Instant::now(); + + let mut futures = pin!(paths_by_id + .iter() + .map(|(file_path_id, (path, object_id))| { + // Copy the values to make them owned and make the borrowck happy + let file_path_id = *file_path_id; + let path = Arc::clone(path); + let object_id = *object_id; + + async move { (extract_media_data(&*path).await, file_path_id, object_id) } + }) + .collect::>()); + + while let Some((res, file_path_id, object_id)) = futures.next().await { + match res { + Ok(Some(media_data)) => { + media_datas.push((media_data, object_id)); + } + Ok(None) => { + // No media data found + output.skipped += 1; + } + Err(e) => errors.push(e.into()), + } + + paths_by_id.remove(&file_path_id); + + let extraction_time = &mut output.extraction_time; + + check_interruption!(interrupter, extraction_start, extraction_time); + } + + *stage = Stage::SaveMediaData { + media_datas: mem::take(media_datas), + }; + } + + Stage::SaveMediaData { media_datas } => { + let db_write_start = Instant::now(); + output.extracted = save_media_data(mem::take(media_datas), db).await?; + + output.db_write_time = db_write_start.elapsed(); + + #[allow(clippy::cast_possible_truncation)] + { + // SAFETY: we shouldn't have more than 4 billion errors LMAO + output.skipped += errors.len() as u32; + } + + break; + } + } + + check_interruption!(interrupter); + } + + Ok(ExecStatus::Done(mem::take(output).into_output())) } } + +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct Output { + pub extracted: u32, + pub skipped: u32, + pub db_read_time: Duration, + pub filtering_time: Duration, + pub extraction_time: Duration, + pub db_write_time: Duration, +} + +pub(super) static FILTERED_IMAGE_EXTENSIONS: Lazy> = Lazy::new(|| { + ALL_IMAGE_EXTENSIONS + .iter() + .copied() + .filter(can_extract_media_data_for_image) + .map(Extension::Image) + .collect() +}); + +pub const fn can_extract_media_data_for_image(image_extension: &ImageExtension) -> bool { + use ImageExtension::{ + Avci, Avcs, Avif, Dng, Heic, Heif, Heifs, Hif, Jpeg, Jpg, Png, Tiff, Webp, + }; + matches!( + image_extension, + Tiff | Dng | Jpeg | Jpg | Heif | Heifs | Heic | Avif | Avcs | Avci | Hif | Png | Webp + ) +} + +pub async fn extract_media_data( + path: impl AsRef + Send, +) -> Result, media_processor::NonCriticalError> { + let path = path.as_ref().to_path_buf(); + + // Running in a separated blocking thread due to MediaData blocking behavior (due to sync exif lib) + spawn_blocking({ + let path = path.clone(); + || match ImageMetadata::from_path(&path) { + Ok(media_data) => Ok(Some(media_data)), + Err(sd_media_metadata::Error::NoExifDataOnPath(_)) => Ok(None), + Err(e) => Err( + media_processor::NonCriticalError::FailedToExtractImageMediaData( + path, + e.to_string(), + ), + ), + } + }) + .await + .map_err(|e| { + media_processor::NonCriticalError::PanicWhileExtractingImageMediaData(path, e.to_string()) + })? +} + +async fn fetch_objects_already_with_media_data( + file_paths: &[file_path_for_media_processor::Data], + db: &PrismaClient, +) -> Result, media_processor::Error> { + db.media_data() + .find_many(vec![media_data::object_id::in_vec( + file_paths + .iter() + .filter_map(|file_path| file_path.object_id) + .collect(), + )]) + .select(media_data::select!({ object_id })) + .exec() + .await + .map(|object_ids| object_ids.into_iter().map(|data| data.object_id).collect()) + .map_err(Into::into) +} + +async fn save_media_data( + media_datas: Vec<(ImageMetadata, object::id::Type)>, + db: &PrismaClient, +) -> Result { + db.media_data() + .create_many( + media_datas + .into_iter() + .map(|(media_data, object_id)| media_data_image_to_query(media_data, object_id)) + .collect(), + ) + .skip_duplicates() + .exec() + .await + .map(|created| { + #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] + { + // SAFETY: we can't create a negative amount of media_data and we won't create more than + // 4 billion media_data entries + created as u32 + } + }) + .map_err(Into::into) +}