diff --git a/Cargo.lock b/Cargo.lock index 3a38b0ab6fcc..9d4e2afbd3e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8650,6 +8650,7 @@ dependencies = [ "sd-core-prisma-helpers", "sd-core-sync", "sd-file-ext", + "sd-media-metadata", "sd-prisma", "sd-sync", "sd-task-system", diff --git a/core/crates/heavy-lifting/Cargo.toml b/core/crates/heavy-lifting/Cargo.toml index fb73a63fd878..a9938a4fa19b 100644 --- a/core/crates/heavy-lifting/Cargo.toml +++ b/core/crates/heavy-lifting/Cargo.toml @@ -17,12 +17,12 @@ sd-core-sync = { path = "../sync" } # Sub-crates sd-file-ext = { path = "../../../crates/file-ext" } +sd-media-metadata = { path = "../../../crates/media-metadata" } sd-prisma = { path = "../../../crates/prisma" } sd-sync = { path = "../../../crates/sync" } sd-task-system = { path = "../../../crates/task-system" } sd-utils = { path = "../../../crates/utils" } - async-channel = { workspace = true } async-trait = { workspace = true } blake3 = { workspace = true } diff --git a/core/crates/heavy-lifting/src/file_identifier/job.rs b/core/crates/heavy-lifting/src/file_identifier/job.rs index d01a55f5042b..008dc5e1dadb 100644 --- a/core/crates/heavy-lifting/src/file_identifier/job.rs +++ b/core/crates/heavy-lifting/src/file_identifier/job.rs @@ -1,4 +1,5 @@ use crate::{ + file_identifier, job_system::{ job::{Job, JobReturn, JobTaskDispatcher, ReturnStatus}, report::ReportOutputMetadata, @@ -6,7 +7,7 @@ use crate::{ SerializableJob, SerializedTasks, }, utils::sub_path::maybe_get_iso_file_path_from_sub_path, - Error, JobContext, JobName, LocationScanState, NonCriticalJobError, ProgressUpdate, + Error, JobContext, JobName, LocationScanState, NonCriticalError, ProgressUpdate, }; use sd_core_file_path_helper::IsolatedFilePathData; @@ -41,7 +42,7 @@ use super::{ ExtractFileMetadataTask, ExtractFileMetadataTaskOutput, ObjectProcessorTask, ObjectProcessorTaskMetrics, }, - FileIdentifierError, CHUNK_SIZE, + CHUNK_SIZE, }; #[derive(Debug)] @@ -52,7 +53,7 @@ pub struct FileIdentifierJob { metadata: Metadata, - errors: Vec, + errors: Vec, pending_tasks_on_resume: Vec>, tasks_for_shutdown: Vec>>, @@ -79,7 +80,7 @@ impl Job for FileIdentifierJob { self.pending_tasks_on_resume = dispatcher .dispatch_many_boxed( rmp_serde::from_slice::)>>(&serialized_tasks) - .map_err(FileIdentifierError::from)? + .map_err(file_identifier::Error::from)? .into_iter() .map(|(task_kind, task_bytes)| async move { match task_kind { @@ -103,7 +104,7 @@ impl Job for FileIdentifierJob { .collect::>() .try_join() .await - .map_err(FileIdentifierError::from)?, + .map_err(file_identifier::Error::from)?, ) .await; @@ -181,7 +182,7 @@ impl Job for FileIdentifierJob { ) .exec() .await - .map_err(FileIdentifierError::from)?; + .map_err(file_identifier::Error::from)?; Ok(ReturnStatus::Completed( JobReturn::builder() @@ -196,7 +197,7 @@ impl FileIdentifierJob { pub fn new( location: location::Data, sub_path: Option, - ) -> Result { + ) -> Result { Ok(Self { location_path: maybe_missing(&location.path, "location.path") .map(PathBuf::from) @@ -215,7 +216,7 @@ impl FileIdentifierJob { pending_running_tasks: &mut FuturesUnordered>, job_ctx: &impl JobContext, dispatcher: &JobTaskDispatcher, - ) -> Result<(), FileIdentifierError> { + ) -> Result<(), file_identifier::Error> { // if we don't have any pending task, then this is a fresh job if self.pending_tasks_on_resume.is_empty() { let db = job_ctx.db(); @@ -399,7 +400,7 @@ struct SaveState { metadata: Metadata, - errors: Vec, + errors: Vec, tasks_for_shutdown_bytes: Option, } diff --git a/core/crates/heavy-lifting/src/file_identifier/mod.rs b/core/crates/heavy-lifting/src/file_identifier/mod.rs index 6659ef37567d..c7bab281cf72 100644 --- a/core/crates/heavy-lifting/src/file_identifier/mod.rs +++ b/core/crates/heavy-lifting/src/file_identifier/mod.rs @@ -1,4 +1,4 @@ -use crate::utils::sub_path::SubPathError; +use crate::utils::sub_path; use sd_core_file_path_helper::{FilePathError, IsolatedFilePathData}; @@ -28,7 +28,7 @@ pub use shallow::shallow; const CHUNK_SIZE: usize = 100; #[derive(thiserror::Error, Debug)] -pub enum FileIdentifierError { +pub enum Error { #[error("missing field on database: {0}")] MissingField(#[from] MissingFieldError), #[error("failed to deserialized stored tasks for job resume: {0}")] @@ -39,13 +39,13 @@ pub enum FileIdentifierError { #[error(transparent)] FilePathError(#[from] FilePathError), #[error(transparent)] - SubPath(#[from] SubPathError), + SubPath(#[from] sub_path::Error), } -impl From for rspc::Error { - fn from(err: FileIdentifierError) -> Self { +impl From for rspc::Error { + fn from(err: Error) -> Self { match err { - FileIdentifierError::SubPath(sub_path_err) => sub_path_err.into(), + Error::SubPath(sub_path_err) => sub_path_err.into(), _ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err), } @@ -53,7 +53,7 @@ impl From for rspc::Error { } #[derive(thiserror::Error, Debug, Serialize, Deserialize, Type)] -pub enum NonCriticalFileIdentifierError { +pub enum NonCriticalError { #[error("failed to extract file metadata: {0}")] FailedToExtractFileMetadata(String), #[cfg(target_os = "windows")] diff --git a/core/crates/heavy-lifting/src/file_identifier/shallow.rs b/core/crates/heavy-lifting/src/file_identifier/shallow.rs index ef85a07b8572..8e3ad3727eab 100644 --- a/core/crates/heavy-lifting/src/file_identifier/shallow.rs +++ b/core/crates/heavy-lifting/src/file_identifier/shallow.rs @@ -1,4 +1,7 @@ -use crate::{utils::sub_path::maybe_get_iso_file_path_from_sub_path, Error, NonCriticalJobError}; +use crate::{ + file_identifier, utils::sub_path::maybe_get_iso_file_path_from_sub_path, Error, + NonCriticalError, +}; use sd_core_file_path_helper::IsolatedFilePathData; use sd_core_prisma_helpers::file_path_for_file_identifier; @@ -22,7 +25,7 @@ use tracing::{debug, warn}; use super::{ tasks::{ExtractFileMetadataTask, ExtractFileMetadataTaskOutput, ObjectProcessorTask}, - FileIdentifierError, CHUNK_SIZE, + CHUNK_SIZE, }; pub async fn shallow( @@ -32,24 +35,24 @@ pub async fn shallow( db: Arc, sync: Arc, invalidate_query: impl Fn(&'static str) + Send + Sync, -) -> Result, Error> { +) -> Result, Error> { let sub_path = sub_path.as_ref(); let location_path = maybe_missing(&location.path, "location.path") .map(PathBuf::from) .map(Arc::new) - .map_err(FileIdentifierError::from)?; + .map_err(file_identifier::Error::from)?; let location = Arc::new(location); let sub_iso_file_path = maybe_get_iso_file_path_from_sub_path(location.id, &Some(sub_path), &*location_path, &db) .await - .map_err(FileIdentifierError::from)? + .map_err(file_identifier::Error::from)? .map_or_else( || { IsolatedFilePathData::new(location.id, &*location_path, &*location_path, true) - .map_err(FileIdentifierError::from) + .map_err(file_identifier::Error::from) }, Ok, )?; @@ -74,7 +77,7 @@ pub async fn shallow( .select(file_path_for_file_identifier::select()) .exec() .await - .map_err(FileIdentifierError::from)?; + .map_err(file_identifier::Error::from)?; let Some(last_orphan) = orphan_paths.last() else { // No orphans here! @@ -117,7 +120,7 @@ async fn process_tasks( dispatcher: BaseTaskDispatcher, db: Arc, sync: Arc, -) -> Result, Error> { +) -> Result, Error> { let mut pending_running_tasks = pending_running_tasks.lend_mut(); let mut errors = vec![]; diff --git a/core/crates/heavy-lifting/src/file_identifier/tasks/extract_file_metadata.rs b/core/crates/heavy-lifting/src/file_identifier/tasks/extract_file_metadata.rs index ef9b2af9b81a..f57b82d14e91 100644 --- a/core/crates/heavy-lifting/src/file_identifier/tasks/extract_file_metadata.rs +++ b/core/crates/heavy-lifting/src/file_identifier/tasks/extract_file_metadata.rs @@ -1,6 +1,6 @@ use crate::{ - file_identifier::{FileMetadata, NonCriticalFileIdentifierError}, - Error, NonCriticalJobError, + file_identifier::{self, FileMetadata}, + Error, NonCriticalError, }; use sd_core_file_path_helper::IsolatedFilePathData; @@ -34,7 +34,7 @@ pub struct ExtractFileMetadataTask { file_paths_by_id: HashMap, identified_files: HashMap, extract_metadata_time: Duration, - errors: Vec, + errors: Vec, is_shallow: bool, } @@ -42,7 +42,7 @@ pub struct ExtractFileMetadataTask { pub struct ExtractFileMetadataTaskOutput { pub identified_files: HashMap, pub extract_metadata_time: Duration, - pub errors: Vec, + pub errors: Vec, } impl ExtractFileMetadataTask { @@ -207,7 +207,7 @@ fn handle_non_critical_errors( location_id: location::id::Type, file_path_pub_id: Uuid, e: &FileIOError, - errors: &mut Vec, + errors: &mut Vec, ) { error!("Failed to extract file metadata : {e:#?}"); @@ -218,14 +218,15 @@ fn handle_non_critical_errors( // Handle case where file is on-demand (NTFS only) if e.source.raw_os_error().map_or(false, |code| code == 362) { errors.push( - NonCriticalFileIdentifierError::FailedToExtractMetadataFromOnDemandFile( + file_identifier::NonCriticalError::FailedToExtractMetadataFromOnDemandFile( formatted_error, ) .into(), ); } else { errors.push( - NonCriticalFileIdentifierError::FailedToExtractFileMetadata(formatted_error).into(), + file_identifier::NonCriticalError::FailedToExtractFileMetadata(formatted_error) + .into(), ); } } @@ -233,7 +234,7 @@ fn handle_non_critical_errors( #[cfg(not(target_os = "windows"))] { errors.push( - NonCriticalFileIdentifierError::FailedToExtractFileMetadata(formatted_error).into(), + file_identifier::NonCriticalError::FailedToExtractFileMetadata(formatted_error).into(), ); } } @@ -243,7 +244,7 @@ fn try_iso_file_path_extraction( file_path_pub_id: Uuid, file_path: &file_path_for_file_identifier::Data, location_path: Arc, - errors: &mut Vec, + errors: &mut Vec, ) -> Option<(Uuid, IsolatedFilePathData<'static>, Arc)> { IsolatedFilePathData::try_from((location_id, file_path)) .map(IsolatedFilePathData::to_owned) @@ -251,7 +252,7 @@ fn try_iso_file_path_extraction( .map_err(|e| { error!("Failed to extract isolated file path data: {e:#?}"); errors.push( - NonCriticalFileIdentifierError::FailedToExtractIsolatedFilePathData(format!( + file_identifier::NonCriticalError::FailedToExtractIsolatedFilePathData(format!( "" )) .into(), diff --git a/core/crates/heavy-lifting/src/file_identifier/tasks/object_processor.rs b/core/crates/heavy-lifting/src/file_identifier/tasks/object_processor.rs index cdb9f0842e6a..1bddeb03782b 100644 --- a/core/crates/heavy-lifting/src/file_identifier/tasks/object_processor.rs +++ b/core/crates/heavy-lifting/src/file_identifier/tasks/object_processor.rs @@ -1,4 +1,4 @@ -use crate::{file_identifier::FileIdentifierError, Error}; +use crate::{file_identifier, Error}; use sd_core_prisma_helpers::{ file_path_for_file_identifier, file_path_pub_id, object_for_file_identifier, @@ -208,7 +208,7 @@ async fn assign_cas_id_to_file_paths( identified_files: &HashMap, db: &PrismaClient, sync: &SyncManager, -) -> Result<(), FileIdentifierError> { +) -> Result<(), file_identifier::Error> { // Assign cas_id to each file path sync.write_ops( db, @@ -243,7 +243,7 @@ async fn assign_cas_id_to_file_paths( async fn fetch_existing_objects_by_cas_id( identified_files: &HashMap, db: &PrismaClient, -) -> Result, FileIdentifierError> { +) -> Result, file_identifier::Error> { // Retrieves objects that are already connected to file paths with the same id db.object() .find_many(vec![object::file_paths::some(vec![ @@ -280,7 +280,7 @@ async fn assign_existing_objects_to_file_paths( objects_by_cas_id: &HashMap, db: &PrismaClient, sync: &SyncManager, -) -> Result, FileIdentifierError> { +) -> Result, file_identifier::Error> { // Attempt to associate each file path with an object that has been // connected to file paths with the same cas_id sync.write_ops( @@ -341,7 +341,7 @@ async fn create_objects( identified_files: &HashMap, db: &PrismaClient, sync: &SyncManager, -) -> Result { +) -> Result { trace!("Creating {} new Objects", identified_files.len(),); let (object_create_args, file_path_update_args) = identified_files diff --git a/core/crates/heavy-lifting/src/indexer/job.rs b/core/crates/heavy-lifting/src/indexer/job.rs index dacea9ab5c26..0f2f4535d4a8 100644 --- a/core/crates/heavy-lifting/src/indexer/job.rs +++ b/core/crates/heavy-lifting/src/indexer/job.rs @@ -1,5 +1,5 @@ use crate::{ - indexer::BATCH_SIZE, + indexer, job_system::{ job::{ Job, JobContext, JobName, JobReturn, JobTaskDispatcher, ProgressUpdate, ReturnStatus, @@ -9,7 +9,7 @@ use crate::{ SerializableJob, SerializedTasks, }, utils::sub_path::get_full_path_from_sub_path, - Error, LocationScanState, NonCriticalJobError, + Error, LocationScanState, NonCriticalError, }; use sd_core_file_path_helper::IsolatedFilePathData; @@ -47,7 +47,7 @@ use super::{ updater::{UpdateTask, UpdateTaskOutput}, walker::{WalkDirTask, WalkTaskOutput, WalkedEntry}, }, - update_directory_sizes, update_location_size, IndexerError, IsoFilePathFactory, WalkerDBProxy, + update_directory_sizes, update_location_size, IsoFilePathFactory, WalkerDBProxy, BATCH_SIZE, }; #[derive(Debug)] @@ -63,7 +63,7 @@ pub struct IndexerJob { ancestors_already_indexed: HashSet>, iso_paths_and_sizes: HashMap, u64>, - errors: Vec, + errors: Vec, pending_tasks_on_resume: Vec>, tasks_for_shutdown: Vec>>, @@ -83,7 +83,7 @@ impl Job for IndexerJob { self.pending_tasks_on_resume = dispatcher .dispatch_many_boxed( rmp_serde::from_slice::)>>(&serialized_tasks) - .map_err(IndexerError::from)? + .map_err(indexer::Error::from)? .into_iter() .map(|(task_kind, task_bytes)| { let indexer_ruler = self.indexer_ruler.clone(); @@ -123,7 +123,7 @@ impl Job for IndexerJob { .collect::>() .try_join() .await - .map_err(IndexerError::from)?, + .map_err(indexer::Error::from)?, ) .await; @@ -243,7 +243,7 @@ impl Job for IndexerJob { ) .exec() .await - .map_err(IndexerError::from)?; + .map_err(indexer::Error::from)?; Ok(ReturnStatus::Completed( JobReturn::builder() @@ -258,7 +258,7 @@ impl IndexerJob { pub fn new( location: location_with_indexer_rules::Data, sub_path: Option, - ) -> Result { + ) -> Result { Ok(Self { indexer_ruler: location .indexer_rules @@ -297,7 +297,7 @@ impl IndexerJob { any_task_output: Box, job_ctx: &impl JobContext, dispatcher: &JobTaskDispatcher, - ) -> Result>, IndexerError> { + ) -> Result>, indexer::Error> { self.metadata.completed_tasks += 1; job_ctx.progress(vec![ProgressUpdate::CompletedTaskCount( @@ -350,7 +350,7 @@ impl IndexerJob { }: WalkTaskOutput, job_ctx: &impl JobContext, dispatcher: &JobTaskDispatcher, - ) -> Result>, IndexerError> { + ) -> Result>, indexer::Error> { self.metadata.scan_read_time += scan_time; let (to_create_count, to_update_count) = (to_create.len(), to_update.len()); @@ -540,7 +540,7 @@ impl IndexerJob { pending_running_tasks: &mut FuturesUnordered>, job_ctx: &impl JobContext, dispatcher: &JobTaskDispatcher, - ) -> Result<(), IndexerError> { + ) -> Result<(), indexer::Error> { // if we don't have any pending task, then this is a fresh job if self.pending_tasks_on_resume.is_empty() { let walker_root_path = Arc::new( @@ -633,7 +633,7 @@ struct SaveState { ancestors_already_indexed: HashSet>, paths_and_sizes: HashMap, u64>, - errors: Vec, + errors: Vec, tasks_for_shutdown_bytes: Option, } diff --git a/core/crates/heavy-lifting/src/indexer/mod.rs b/core/crates/heavy-lifting/src/indexer/mod.rs index 2bac41b1b8dc..4e482d712967 100644 --- a/core/crates/heavy-lifting/src/indexer/mod.rs +++ b/core/crates/heavy-lifting/src/indexer/mod.rs @@ -1,4 +1,4 @@ -use crate::{utils::sub_path::SubPathError, NonCriticalJobError}; +use crate::utils::sub_path; use sd_core_file_path_helper::{FilePathError, IsolatedFilePathData}; use sd_core_indexer_rules::IndexerRuleError; @@ -8,7 +8,7 @@ use sd_core_prisma_helpers::{ use sd_core_sync::Manager as SyncManager; use sd_prisma::{ - prisma::{file_path, location, PrismaClient, SortOrder}, + prisma::{file_path, indexer_rule, location, PrismaClient, SortOrder}, prisma_sync, }; use sd_sync::OperationFactory; @@ -46,12 +46,12 @@ use tasks::walker; const BATCH_SIZE: usize = 1000; #[derive(thiserror::Error, Debug)] -pub enum IndexerError { +pub enum Error { // Not Found errors #[error("indexer rule not found: ")] - IndexerRuleNotFound(i32), + IndexerRuleNotFound(indexer_rule::id::Type), #[error(transparent)] - SubPath(#[from] SubPathError), + SubPath(#[from] sub_path::Error), // Internal Errors #[error("database Error: {0}")] @@ -72,16 +72,16 @@ pub enum IndexerError { Rules(#[from] IndexerRuleError), } -impl From for rspc::Error { - fn from(err: IndexerError) -> Self { +impl From for rspc::Error { + fn from(err: Error) -> Self { match err { - IndexerError::IndexerRuleNotFound(_) => { + Error::IndexerRuleNotFound(_) => { Self::with_cause(ErrorCode::NotFound, err.to_string(), err) } - IndexerError::SubPath(sub_path_err) => sub_path_err.into(), + Error::SubPath(sub_path_err) => sub_path_err.into(), - IndexerError::Rules(rule_err) => rule_err.into(), + Error::Rules(rule_err) => rule_err.into(), _ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err), } @@ -89,7 +89,7 @@ impl From for rspc::Error { } #[derive(thiserror::Error, Debug, Serialize, Deserialize, Type)] -pub enum NonCriticalIndexerError { +pub enum NonCriticalError { #[error("failed to read directory entry: {0}")] FailedDirectoryEntry(String), #[error("failed to fetch metadata: {0}")] @@ -134,7 +134,7 @@ async fn update_directory_sizes( iso_paths_and_sizes: HashMap, u64, impl BuildHasher + Send>, db: &PrismaClient, sync: &SyncManager, -) -> Result<(), IndexerError> { +) -> Result<(), Error> { let to_sync_and_update = db ._batch(chunk_db_queries(iso_paths_and_sizes.keys(), db)) .await? @@ -160,7 +160,7 @@ async fn update_directory_sizes( ), )) }) - .collect::, IndexerError>>()? + .collect::, Error>>()? .into_iter() .unzip::<_, _, Vec<_>, Vec<_>>(); @@ -173,7 +173,7 @@ async fn update_location_size( location_id: location::id::Type, db: &PrismaClient, invalidate_query: &InvalidateQuery, -) -> Result<(), IndexerError> { +) -> Result<(), Error> { let total_size = db .file_path() .find_many(vec![ @@ -211,7 +211,7 @@ async fn remove_non_existing_file_paths( to_remove: Vec, db: &PrismaClient, sync: &sd_core_sync::Manager, -) -> Result { +) -> Result { #[allow(clippy::cast_sign_loss)] let (sync_params, db_params): (Vec<_>, Vec<_>) = to_remove .into_iter() @@ -248,8 +248,8 @@ async fn reverse_update_directories_sizes( location_path: impl AsRef + Send, db: &PrismaClient, sync: &SyncManager, - errors: &mut Vec, -) -> Result<(), IndexerError> { + errors: &mut Vec, +) -> Result<(), Error> { let location_path = location_path.as_ref(); let ancestors = base_path @@ -279,7 +279,7 @@ async fn reverse_update_directories_sizes( IsolatedFilePathData::try_from(file_path) .map_err(|e| { errors.push( - NonCriticalIndexerError::MissingFilePathData(format!( + NonCriticalError::MissingFilePathData(format!( "Found a file_path missing data: , error: {e:#?}", from_bytes_to_uuid(&pub_id) )) @@ -345,8 +345,8 @@ async fn compute_sizes( materialized_paths: Vec, pub_id_by_ancestor_materialized_path: &mut HashMap, db: &PrismaClient, - errors: &mut Vec, -) -> Result<(), IndexerError> { + errors: &mut Vec, +) -> Result<(), Error> { db.file_path() .find_many(vec![ file_path::location_id::equals(Some(location_id)), @@ -371,7 +371,7 @@ async fn compute_sizes( } } else { errors.push( - NonCriticalIndexerError::MissingFilePathData(format!( + NonCriticalError::MissingFilePathData(format!( "Corrupt database possessing a file_path entry without materialized_path: ", from_bytes_to_uuid(&file_path.pub_id) )) @@ -409,7 +409,7 @@ impl walker::WalkerDBProxy for WalkerDBProxy { async fn fetch_file_paths( &self, found_paths: Vec, - ) -> Result, IndexerError> { + ) -> Result, Error> { // Each found path is a AND with 4 terms, and SQLite has a expression tree limit of 1000 terms // so we will use chunks of 200 just to be safe self.db @@ -435,7 +435,7 @@ impl walker::WalkerDBProxy for WalkerDBProxy { &self, parent_iso_file_path: &IsolatedFilePathData<'_>, unique_location_id_materialized_path_name_extension_params: Vec, - ) -> Result, NonCriticalIndexerError> { + ) -> Result, NonCriticalError> { // NOTE: This batch size can be increased if we wish to trade memory for more performance const BATCH_SIZE: i64 = 1000; @@ -461,7 +461,7 @@ impl walker::WalkerDBProxy for WalkerDBProxy { .flat_map(|file_paths| file_paths.into_iter().map(|file_path| file_path.id)) .collect::>() }) - .map_err(|e| NonCriticalIndexerError::FetchAlreadyExistingFilePathIds(e.to_string()))?; + .map_err(|e| NonCriticalError::FetchAlreadyExistingFilePathIds(e.to_string()))?; let mut to_remove = vec![]; let mut cursor = 1; @@ -484,7 +484,7 @@ impl walker::WalkerDBProxy for WalkerDBProxy { .select(file_path_pub_and_cas_ids::select()) .exec() .await - .map_err(|e| NonCriticalIndexerError::FetchFilePathsToRemove(e.to_string()))?; + .map_err(|e| NonCriticalError::FetchFilePathsToRemove(e.to_string()))?; #[allow(clippy::cast_possible_truncation)] // Safe because we are using a constant let should_stop = found.len() < BATCH_SIZE as usize; diff --git a/core/crates/heavy-lifting/src/indexer/shallow.rs b/core/crates/heavy-lifting/src/indexer/shallow.rs index 96eaf4398934..44c3ebc512a7 100644 --- a/core/crates/heavy-lifting/src/indexer/shallow.rs +++ b/core/crates/heavy-lifting/src/indexer/shallow.rs @@ -1,4 +1,4 @@ -use crate::{utils::sub_path::get_full_path_from_sub_path, Error, NonCriticalJobError}; +use crate::{indexer, utils::sub_path::get_full_path_from_sub_path, Error, NonCriticalError}; use sd_core_indexer_rules::{IndexerRule, IndexerRuler}; use sd_core_prisma_helpers::location_with_indexer_rules; @@ -25,8 +25,7 @@ use super::{ updater::{UpdateTask, UpdateTaskOutput}, walker::{ToWalkEntry, WalkDirTask, WalkTaskOutput, WalkedEntry}, }, - update_directory_sizes, update_location_size, IndexerError, IsoFilePathFactory, WalkerDBProxy, - BATCH_SIZE, + update_directory_sizes, update_location_size, IsoFilePathFactory, WalkerDBProxy, BATCH_SIZE, }; pub async fn shallow( @@ -36,18 +35,18 @@ pub async fn shallow( db: Arc, sync: Arc, invalidate_query: impl Fn(&'static str) + Send + Sync, -) -> Result, Error> { +) -> Result, Error> { let sub_path = sub_path.as_ref(); let location_path = maybe_missing(&location.path, "location.path") .map(PathBuf::from) .map(Arc::new) - .map_err(IndexerError::from)?; + .map_err(indexer::Error::from)?; let to_walk_path = Arc::new( get_full_path_from_sub_path(location.id, &Some(sub_path), &*location_path, &db) .await - .map_err(IndexerError::from)?, + .map_err(indexer::Error::from)?, ); let Some(WalkTaskOutput { @@ -135,7 +134,7 @@ async fn walk( .map(|rule| IndexerRule::try_from(&rule.indexer_rule)) .collect::, _>>() .map(IndexerRuler::new) - .map_err(IndexerError::from)?, + .map_err(indexer::Error::from)?, IsoFilePathFactory { location_id: location.id, location_path, diff --git a/core/crates/heavy-lifting/src/indexer/tasks/saver.rs b/core/crates/heavy-lifting/src/indexer/tasks/saver.rs index 715fc770c81e..3c97b01ad76e 100644 --- a/core/crates/heavy-lifting/src/indexer/tasks/saver.rs +++ b/core/crates/heavy-lifting/src/indexer/tasks/saver.rs @@ -1,4 +1,4 @@ -use crate::{indexer::IndexerError, Error}; +use crate::{indexer, Error}; use sd_core_file_path_helper::IsolatedFilePathDataParts; use sd_core_sync::Manager as SyncManager; @@ -234,7 +234,7 @@ impl Task for SaveTask { ), ) .await - .map_err(IndexerError::from)? as u64; + .map_err(indexer::Error::from)? as u64; trace!("Inserted {saved_count} records"); diff --git a/core/crates/heavy-lifting/src/indexer/tasks/updater.rs b/core/crates/heavy-lifting/src/indexer/tasks/updater.rs index f4cf0d7fd780..e547ec8ac7b8 100644 --- a/core/crates/heavy-lifting/src/indexer/tasks/updater.rs +++ b/core/crates/heavy-lifting/src/indexer/tasks/updater.rs @@ -1,4 +1,4 @@ -use crate::{indexer::IndexerError, Error}; +use crate::{indexer, Error}; use sd_core_file_path_helper::IsolatedFilePathDataParts; use sd_core_sync::Manager as SyncManager; @@ -222,7 +222,7 @@ impl Task for UpdateTask { (sync_stuff.into_iter().flatten().collect(), paths_to_update), ) .await - .map_err(IndexerError::from)?; + .map_err(indexer::Error::from)?; trace!("Updated {updated:?} records"); @@ -240,7 +240,7 @@ async fn fetch_objects_ids_to_unlink( walked_entries: &[WalkedEntry], object_ids_that_should_be_unlinked: &mut HashSet, db: &PrismaClient, -) -> Result<(), IndexerError> { +) -> Result<(), indexer::Error> { if object_ids_that_should_be_unlinked.is_empty() { // First we consult which file paths we should unlink let object_ids = walked_entries diff --git a/core/crates/heavy-lifting/src/indexer/tasks/walker.rs b/core/crates/heavy-lifting/src/indexer/tasks/walker.rs index 3ed771e2eb3c..238d8ab52aa4 100644 --- a/core/crates/heavy-lifting/src/indexer/tasks/walker.rs +++ b/core/crates/heavy-lifting/src/indexer/tasks/walker.rs @@ -1,7 +1,4 @@ -use crate::{ - indexer::{IndexerError, NonCriticalIndexerError}, - Error, NonCriticalJobError, -}; +use crate::{indexer, Error, NonCriticalError}; use sd_core_file_path_helper::{FilePathError, FilePathMetadata, IsolatedFilePathData}; use sd_core_indexer_rules::{IndexerRuler, MetadataForIndexerRules, RuleKind}; @@ -111,13 +108,14 @@ pub trait WalkerDBProxy: Clone + Send + Sync + fmt::Debug + 'static { fn fetch_file_paths( &self, found_paths: Vec, - ) -> impl Future, IndexerError>> + Send; + ) -> impl Future, indexer::Error>> + Send; fn fetch_file_paths_to_remove( &self, parent_iso_file_path: &IsolatedFilePathData<'_>, unique_location_id_materialized_path_name_extension_params: Vec, - ) -> impl Future, NonCriticalIndexerError>> + Send; + ) -> impl Future, indexer::NonCriticalError>> + + Send; } #[derive(Debug, Serialize, Deserialize)] @@ -141,7 +139,7 @@ pub struct WalkTaskOutput { pub to_update: Vec, pub to_remove: Vec, pub accepted_ancestors: HashSet, - pub errors: Vec, + pub errors: Vec, pub directory_iso_file_path: IsolatedFilePathData<'static>, pub total_size: u64, pub handles: Vec>, @@ -160,7 +158,7 @@ struct InnerMetadata { } impl InnerMetadata { - fn new(path: impl AsRef, metadata: &Metadata) -> Result { + fn new(path: impl AsRef, metadata: &Metadata) -> Result { let FilePathMetadata { inode, size_in_bytes, @@ -168,7 +166,7 @@ impl InnerMetadata { modified_at, hidden, } = FilePathMetadata::from_path(path, metadata) - .map_err(|e| NonCriticalIndexerError::FilePathMetadata(e.to_string()))?; + .map_err(|e| indexer::NonCriticalError::FilePathMetadata(e.to_string()))?; Ok(Self { is_dir: metadata.is_dir(), @@ -237,7 +235,7 @@ struct WalkDirSaveState { root: Arc, entry_iso_file_path: IsolatedFilePathData<'static>, stage: WalkerStageSaveState, - errors: Vec, + errors: Vec, scan_time: Duration, is_shallow: bool, } @@ -367,7 +365,7 @@ where db_proxy: DBProxy, stage: WalkerStage, maybe_dispatcher: Option, - errors: Vec, + errors: Vec, scan_time: Duration, is_shallow: bool, } @@ -385,7 +383,7 @@ where iso_file_path_factory: IsoPathFactory, db_proxy: DBProxy, dispatcher: Dispatcher, - ) -> Result { + ) -> Result { let entry = entry.into(); Ok(Self { id: TaskId::new_v4(), @@ -415,7 +413,7 @@ where indexer_ruler: IndexerRuler, iso_file_path_factory: IsoPathFactory, db_proxy: DBProxy, - ) -> Result { + ) -> Result { let entry = entry.into(); Ok(Self { id: TaskId::new_v4(), @@ -545,7 +543,7 @@ where *stage = WalkerStage::Walking { read_dir_stream: ReadDirStream::new(fs::read_dir(&path).await.map_err( |e| { - IndexerError::FileIO( + indexer::Error::FileIO( (&path, e, "Failed to open directory to read its entries") .into(), ) @@ -565,8 +563,8 @@ where found_paths.push(dir_entry.path()); } Err(e) => { - errors.push(NonCriticalJobError::Indexer( - NonCriticalIndexerError::FailedDirectoryEntry( + errors.push(NonCriticalError::Indexer( + indexer::NonCriticalError::FailedDirectoryEntry( FileIOError::from((&path, e)).to_string(), ), )); @@ -709,7 +707,7 @@ where async fn segregate_creates_and_updates( walking_entries: &mut Vec, db_proxy: &impl WalkerDBProxy, -) -> Result<(Vec, Vec, u64), IndexerError> { +) -> Result<(Vec, Vec, u64), Error> { if walking_entries.is_empty() { Ok((vec![], vec![], 0)) } else { @@ -791,7 +789,7 @@ async fn keep_walking( db_proxy: &impl WalkerDBProxy, maybe_to_keep_walking: &mut Option>, dispatcher: &Option>, - errors: &mut Vec, + errors: &mut Vec, ) -> Vec> { if let (Some(dispatcher), Some(to_keep_walking)) = (dispatcher, maybe_to_keep_walking) { dispatcher @@ -807,7 +805,7 @@ async fn keep_walking( db_proxy.clone(), dispatcher.clone(), ) - .map_err(|e| NonCriticalIndexerError::DispatchKeepWalking(e.to_string())) + .map_err(|e| indexer::NonCriticalError::DispatchKeepWalking(e.to_string())) }) .filter_map(|res| res.map_err(|e| errors.push(e.into())).ok()), ) @@ -819,7 +817,7 @@ async fn keep_walking( async fn collect_metadata( found_paths: &mut Vec, - errors: &mut Vec, + errors: &mut Vec, ) -> HashMap { found_paths .drain(..) @@ -827,7 +825,7 @@ async fn collect_metadata( fs::metadata(¤t_path) .await .map_err(|e| { - NonCriticalIndexerError::Metadata( + indexer::NonCriticalError::Metadata( FileIOError::from((¤t_path, e)).to_string(), ) }) @@ -847,7 +845,7 @@ async fn collect_metadata( async fn apply_indexer_rules( paths_and_metadatas: &mut HashMap, indexer_ruler: &IndexerRuler, - errors: &mut Vec, + errors: &mut Vec, ) -> HashMap>)> { paths_and_metadatas .drain() @@ -860,7 +858,7 @@ async fn apply_indexer_rules( .map(|acceptance_per_rule_kind| { (current_path, (metadata, acceptance_per_rule_kind)) }) - .map_err(|e| NonCriticalIndexerError::IndexerRule(e.to_string())) + .map_err(|e| indexer::NonCriticalError::IndexerRule(e.to_string())) }) .collect::>() .join() @@ -879,7 +877,7 @@ async fn process_rules_results( (InnerMetadata, HashMap>), >, maybe_to_keep_walking: &mut Option>, - errors: &mut Vec, + errors: &mut Vec, ) -> (HashMap, HashSet) { let root = root.as_ref(); @@ -951,7 +949,7 @@ async fn process_rules_results( fs::metadata(&ancestor_path) .await .map_err(|e| { - NonCriticalIndexerError::Metadata( + indexer::NonCriticalError::Metadata( FileIOError::from((&ancestor_path, e)).to_string(), ) }) @@ -964,7 +962,7 @@ async fn process_rules_results( } .into() }) - .map_err(|e| NonCriticalIndexerError::FilePathMetadata(e.to_string())) + .map_err(|e| indexer::NonCriticalError::FilePathMetadata(e.to_string())) }) }) .collect::>() @@ -1023,7 +1021,7 @@ fn accept_ancestors( accepted: &mut HashMap, iso_file_path_factory: &impl IsoFilePathFactory, accepted_ancestors: &mut HashMap, PathBuf>, - errors: &mut Vec, + errors: &mut Vec, ) { // If the ancestors directories wasn't indexed before, now we do for ancestor in current_path @@ -1033,7 +1031,7 @@ fn accept_ancestors( { if let Ok(iso_file_path) = iso_file_path_factory .build(ancestor, true) - .map_err(|e| errors.push(NonCriticalIndexerError::IsoFilePath(e.to_string()).into())) + .map_err(|e| errors.push(indexer::NonCriticalError::IsoFilePath(e.to_string()).into())) { match accepted_ancestors.entry(iso_file_path) { Entry::Occupied(_) => { @@ -1083,7 +1081,7 @@ async fn gather_file_paths_to_remove( entry_iso_file_path: &IsolatedFilePathData<'_>, iso_file_path_factory: &impl IsoFilePathFactory, db_proxy: &impl WalkerDBProxy, - errors: &mut Vec, + errors: &mut Vec, ) -> (Vec, Vec) { let (walking, to_delete_params) = accepted_paths .drain() @@ -1102,7 +1100,7 @@ async fn gather_file_paths_to_remove( ) }) .map_err(|e| { - errors.push(NonCriticalIndexerError::IsoFilePath(e.to_string()).into()); + errors.push(indexer::NonCriticalError::IsoFilePath(e.to_string()).into()); }) .ok() }) @@ -1158,7 +1156,7 @@ mod tests { async fn fetch_file_paths( &self, _: Vec, - ) -> Result, IndexerError> { + ) -> Result, indexer::Error> { Ok(vec![]) } @@ -1166,7 +1164,7 @@ mod tests { &self, _: &IsolatedFilePathData<'_>, _: Vec, - ) -> Result, NonCriticalIndexerError> { + ) -> Result, indexer::NonCriticalError> { Ok(vec![]) } } diff --git a/core/crates/heavy-lifting/src/job_system/job.rs b/core/crates/heavy-lifting/src/job_system/job.rs index 191d71148cea..c8a0db09d3b4 100644 --- a/core/crates/heavy-lifting/src/job_system/job.rs +++ b/core/crates/heavy-lifting/src/job_system/job.rs @@ -1,4 +1,4 @@ -use crate::{Error, NonCriticalJobError}; +use crate::{Error, NonCriticalError}; use sd_core_sync::Manager as SyncManager; @@ -144,7 +144,7 @@ where pub struct JobReturn { data: JobOutputData, metadata: Option, - non_critical_errors: Vec, + non_critical_errors: Vec, } impl JobReturn { @@ -185,7 +185,7 @@ impl JobReturnBuilder { } #[must_use] - pub fn with_non_critical_errors(mut self, errors: Vec) -> Self { + pub fn with_non_critical_errors(mut self, errors: Vec) -> Self { if self.job_return.non_critical_errors.is_empty() { self.job_return.non_critical_errors = errors; } else { @@ -207,7 +207,7 @@ pub struct JobOutput { job_name: JobName, data: JobOutputData, metadata: Vec, - non_critical_errors: Vec, + non_critical_errors: Vec, } impl JobOutput { diff --git a/core/crates/heavy-lifting/src/lib.rs b/core/crates/heavy-lifting/src/lib.rs index 1cc079f8d56b..935e65e0bfd6 100644 --- a/core/crates/heavy-lifting/src/lib.rs +++ b/core/crates/heavy-lifting/src/lib.rs @@ -36,11 +36,9 @@ use thiserror::Error; pub mod file_identifier; pub mod indexer; pub mod job_system; +pub mod media_processor; pub mod utils; -use file_identifier::{FileIdentifierError, NonCriticalFileIdentifierError}; -use indexer::{IndexerError, NonCriticalIndexerError}; - pub use job_system::{ job::{IntoJob, JobBuilder, JobContext, JobName, JobOutput, JobOutputData, ProgressUpdate}, JobId, JobSystem, @@ -49,9 +47,11 @@ pub use job_system::{ #[derive(Error, Debug)] pub enum Error { #[error(transparent)] - Indexer(#[from] IndexerError), + Indexer(#[from] indexer::Error), + #[error(transparent)] + FileIdentifier(#[from] file_identifier::Error), #[error(transparent)] - FileIdentifier(#[from] FileIdentifierError), + MediaProcessor(#[from] media_processor::Error), #[error(transparent)] TaskSystem(#[from] TaskSystemError), @@ -62,6 +62,7 @@ impl From for rspc::Error { match e { Error::Indexer(e) => e.into(), Error::FileIdentifier(e) => e.into(), + Error::MediaProcessor(e) => e.into(), Error::TaskSystem(e) => { Self::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e) } @@ -70,12 +71,12 @@ impl From for rspc::Error { } #[derive(thiserror::Error, Debug, Serialize, Deserialize, Type)] -pub enum NonCriticalJobError { +pub enum NonCriticalError { // TODO: Add variants as needed #[error(transparent)] - Indexer(#[from] NonCriticalIndexerError), + Indexer(#[from] indexer::NonCriticalError), #[error(transparent)] - FileIdentifier(#[from] NonCriticalFileIdentifierError), + FileIdentifier(#[from] file_identifier::NonCriticalError), } #[repr(i32)] diff --git a/core/crates/heavy-lifting/src/media_processor/mod.rs b/core/crates/heavy-lifting/src/media_processor/mod.rs new file mode 100644 index 000000000000..f7de0e5095d5 --- /dev/null +++ b/core/crates/heavy-lifting/src/media_processor/mod.rs @@ -0,0 +1,20 @@ +use sd_utils::error::FileIOError; + +mod tasks; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + // Internal errors + #[error("database error: {0}")] + Database(#[from] prisma_client_rust::QueryError), + #[error(transparent)] + FileIO(#[from] FileIOError), + #[error(transparent)] + MediaData(#[from] sd_media_metadata::Error), +} + +impl From for rspc::Error { + fn from(e: Error) -> Self { + Self::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e) + } +} diff --git a/core/crates/heavy-lifting/src/media_processor/tasks/media_data_extractor.rs b/core/crates/heavy-lifting/src/media_processor/tasks/media_data_extractor.rs new file mode 100644 index 000000000000..c5c089f9dbc2 --- /dev/null +++ b/core/crates/heavy-lifting/src/media_processor/tasks/media_data_extractor.rs @@ -0,0 +1,27 @@ +use crate::Error; + +use sd_task_system::{ExecStatus, Interrupter, Task, TaskId}; + +#[derive(Debug)] +pub struct MediaDataExtractor { + id: TaskId, +} + +impl MediaDataExtractor { + pub fn new() -> Self { + Self { + id: TaskId::new_v4(), + } + } +} + +#[async_trait::async_trait] +impl Task for MediaDataExtractor { + fn id(&self) -> TaskId { + self.id + } + + async fn run(&mut self, interrupter: &Interrupter) -> Result { + Ok(ExecStatus::Canceled) + } +} diff --git a/core/crates/heavy-lifting/src/media_processor/tasks/mod.rs b/core/crates/heavy-lifting/src/media_processor/tasks/mod.rs new file mode 100644 index 000000000000..ae250876917c --- /dev/null +++ b/core/crates/heavy-lifting/src/media_processor/tasks/mod.rs @@ -0,0 +1,76 @@ +mod media_data_extractor; + +use sd_media_metadata::ImageMetadata; +use sd_prisma::prisma::media_data; +use sd_sync::option_sync_db_entry; +use sd_utils::chain_optional_iter; + +use serde::{de::DeserializeOwned, Serialize}; + +pub fn media_data_image_to_query( + mdi: ImageMetadata, + object_id: media_data::object_id::Type, +) -> media_data::CreateUnchecked { + media_data::CreateUnchecked { + object_id, + _params: vec![ + media_data::camera_data::set(serde_json::to_vec(&mdi.camera_data).ok()), + media_data::media_date::set(serde_json::to_vec(&mdi.date_taken).ok()), + media_data::resolution::set(serde_json::to_vec(&mdi.resolution).ok()), + media_data::media_location::set(serde_json::to_vec(&mdi.location).ok()), + media_data::artist::set(mdi.artist), + media_data::description::set(mdi.description), + media_data::copyright::set(mdi.copyright), + media_data::exif_version::set(mdi.exif_version), + media_data::epoch_time::set(mdi.date_taken.map(|x| x.unix_timestamp())), + ], + } +} + +pub fn media_data_image_to_query_params( + mdi: ImageMetadata, +) -> (Vec<(&'static str, rmpv::Value)>, Vec) { + chain_optional_iter( + [], + [ + option_sync_db_entry!( + serde_json::to_vec(&mdi.camera_data).ok(), + media_data::camera_data + ), + option_sync_db_entry!( + serde_json::to_vec(&mdi.date_taken).ok(), + media_data::media_date + ), + option_sync_db_entry!( + serde_json::to_vec(&mdi.location).ok(), + media_data::media_location + ), + option_sync_db_entry!(mdi.artist, media_data::artist), + option_sync_db_entry!(mdi.description, media_data::description), + option_sync_db_entry!(mdi.copyright, media_data::copyright), + option_sync_db_entry!(mdi.exif_version, media_data::exif_version), + ], + ) + .into_iter() + .unzip() +} + +pub fn media_data_image_from_prisma_data(data: media_data::Data) -> ImageMetadata { + ImageMetadata { + camera_data: from_slice_option_to_option(data.camera_data).unwrap_or_default(), + date_taken: from_slice_option_to_option(data.media_date).unwrap_or_default(), + resolution: from_slice_option_to_option(data.resolution).unwrap_or_default(), + location: from_slice_option_to_option(data.media_location), + artist: data.artist, + description: data.description, + copyright: data.copyright, + exif_version: data.exif_version, + } +} + +#[inline] +fn from_slice_option_to_option( + value: Option>, +) -> Option { + value.map_or_else(Default::default, |x| serde_json::from_slice(&x).ok()) +} diff --git a/core/crates/heavy-lifting/src/utils/sub_path.rs b/core/crates/heavy-lifting/src/utils/sub_path.rs index 6461ccdb7f44..7f489890ae3b 100644 --- a/core/crates/heavy-lifting/src/utils/sub_path.rs +++ b/core/crates/heavy-lifting/src/utils/sub_path.rs @@ -11,7 +11,7 @@ use std::path::{Path, PathBuf}; use prisma_client_rust::QueryError; #[derive(thiserror::Error, Debug)] -pub enum SubPathError { +pub enum Error { #[error("received sub path not in database: ", .0.display())] SubPathNotFound(Box), @@ -22,10 +22,10 @@ pub enum SubPathError { IsoFilePath(#[from] FilePathError), } -impl From for rspc::Error { - fn from(err: SubPathError) -> Self { +impl From for rspc::Error { + fn from(err: Error) -> Self { match err { - SubPathError::SubPathNotFound(_) => { + Error::SubPathNotFound(_) => { Self::with_cause(ErrorCode::NotFound, err.to_string(), err) } @@ -39,7 +39,7 @@ pub async fn get_full_path_from_sub_path( sub_path: &Option + Send + Sync>, location_path: impl AsRef + Send, db: &PrismaClient, -) -> Result { +) -> Result { let location_path = location_path.as_ref(); match sub_path { @@ -53,7 +53,7 @@ pub async fn get_full_path_from_sub_path( sub_path, &IsolatedFilePathData::new(location_id, location_path, &full_path, true)?, db, - SubPathError::SubPathNotFound, + Error::SubPathNotFound, ) .await?; @@ -68,7 +68,7 @@ pub async fn maybe_get_iso_file_path_from_sub_path( sub_path: &Option + Send + Sync>, location_path: impl AsRef + Send, db: &PrismaClient, -) -> Result>, SubPathError> { +) -> Result>, Error> { let location_path = location_path.as_ref(); match sub_path { @@ -83,7 +83,7 @@ pub async fn maybe_get_iso_file_path_from_sub_path( sub_path, &sub_iso_file_path, db, - SubPathError::SubPathNotFound, + Error::SubPathNotFound, ) .await .map(|()| Some(sub_iso_file_path)) diff --git a/crates/p2p-block/src/sb_request.rs b/crates/p2p-block/src/sb_request.rs index a119b292323a..1384ca5ed898 100644 --- a/crates/p2p-block/src/sb_request.rs +++ b/crates/p2p-block/src/sb_request.rs @@ -198,7 +198,7 @@ mod tests { async fn test_spaceblock_requests_empty() { let req = SpaceblockRequests { id: Uuid::new_v4(), - block_size: BlockSize::from_size(42069), + block_size: BlockSize::from_file_size(42069), requests: vec![], };