Skip to content

Commit

Permalink
Moving stuff around
Browse files Browse the repository at this point in the history
  • Loading branch information
fogodev committed Apr 25, 2024
1 parent 4493372 commit b3b4050
Show file tree
Hide file tree
Showing 20 changed files with 269 additions and 142 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/crates/heavy-lifting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ sd-core-sync = { path = "../sync" }

# Sub-crates
sd-file-ext = { path = "../../../crates/file-ext" }
sd-media-metadata = { path = "../../../crates/media-metadata" }
sd-prisma = { path = "../../../crates/prisma" }
sd-sync = { path = "../../../crates/sync" }
sd-task-system = { path = "../../../crates/task-system" }
sd-utils = { path = "../../../crates/utils" }


async-channel = { workspace = true }
async-trait = { workspace = true }
blake3 = { workspace = true }
Expand Down
19 changes: 10 additions & 9 deletions core/crates/heavy-lifting/src/file_identifier/job.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{
file_identifier,
job_system::{
job::{Job, JobReturn, JobTaskDispatcher, ReturnStatus},
report::ReportOutputMetadata,
utils::cancel_pending_tasks,
SerializableJob, SerializedTasks,
},
utils::sub_path::maybe_get_iso_file_path_from_sub_path,
Error, JobContext, JobName, LocationScanState, NonCriticalJobError, ProgressUpdate,
Error, JobContext, JobName, LocationScanState, NonCriticalError, ProgressUpdate,
};

use sd_core_file_path_helper::IsolatedFilePathData;
Expand Down Expand Up @@ -41,7 +42,7 @@ use super::{
ExtractFileMetadataTask, ExtractFileMetadataTaskOutput, ObjectProcessorTask,
ObjectProcessorTaskMetrics,
},
FileIdentifierError, CHUNK_SIZE,
CHUNK_SIZE,
};

#[derive(Debug)]
Expand All @@ -52,7 +53,7 @@ pub struct FileIdentifierJob {

metadata: Metadata,

errors: Vec<NonCriticalJobError>,
errors: Vec<NonCriticalError>,

pending_tasks_on_resume: Vec<TaskHandle<Error>>,
tasks_for_shutdown: Vec<Box<dyn Task<Error>>>,
Expand All @@ -79,7 +80,7 @@ impl Job for FileIdentifierJob {
self.pending_tasks_on_resume = dispatcher
.dispatch_many_boxed(
rmp_serde::from_slice::<Vec<(TaskKind, Vec<u8>)>>(&serialized_tasks)
.map_err(FileIdentifierError::from)?
.map_err(file_identifier::Error::from)?
.into_iter()
.map(|(task_kind, task_bytes)| async move {
match task_kind {
Expand All @@ -103,7 +104,7 @@ impl Job for FileIdentifierJob {
.collect::<Vec<_>>()
.try_join()
.await
.map_err(FileIdentifierError::from)?,
.map_err(file_identifier::Error::from)?,
)
.await;

Expand Down Expand Up @@ -181,7 +182,7 @@ impl Job for FileIdentifierJob {
)
.exec()
.await
.map_err(FileIdentifierError::from)?;
.map_err(file_identifier::Error::from)?;

Ok(ReturnStatus::Completed(
JobReturn::builder()
Expand All @@ -196,7 +197,7 @@ impl FileIdentifierJob {
pub fn new(
location: location::Data,
sub_path: Option<PathBuf>,
) -> Result<Self, FileIdentifierError> {
) -> Result<Self, file_identifier::Error> {
Ok(Self {
location_path: maybe_missing(&location.path, "location.path")
.map(PathBuf::from)
Expand All @@ -215,7 +216,7 @@ impl FileIdentifierJob {
pending_running_tasks: &mut FuturesUnordered<TaskHandle<Error>>,
job_ctx: &impl JobContext,
dispatcher: &JobTaskDispatcher,
) -> Result<(), FileIdentifierError> {
) -> Result<(), file_identifier::Error> {
// if we don't have any pending task, then this is a fresh job
if self.pending_tasks_on_resume.is_empty() {
let db = job_ctx.db();
Expand Down Expand Up @@ -399,7 +400,7 @@ struct SaveState {

metadata: Metadata,

errors: Vec<NonCriticalJobError>,
errors: Vec<NonCriticalError>,

tasks_for_shutdown_bytes: Option<SerializedTasks>,
}
Expand Down
14 changes: 7 additions & 7 deletions core/crates/heavy-lifting/src/file_identifier/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::utils::sub_path::SubPathError;
use crate::utils::sub_path;

use sd_core_file_path_helper::{FilePathError, IsolatedFilePathData};

Expand Down Expand Up @@ -28,7 +28,7 @@ pub use shallow::shallow;
const CHUNK_SIZE: usize = 100;

#[derive(thiserror::Error, Debug)]
pub enum FileIdentifierError {
pub enum Error {
#[error("missing field on database: {0}")]
MissingField(#[from] MissingFieldError),
#[error("failed to deserialized stored tasks for job resume: {0}")]
Expand All @@ -39,21 +39,21 @@ pub enum FileIdentifierError {
#[error(transparent)]
FilePathError(#[from] FilePathError),
#[error(transparent)]
SubPath(#[from] SubPathError),
SubPath(#[from] sub_path::Error),
}

impl From<FileIdentifierError> for rspc::Error {
fn from(err: FileIdentifierError) -> Self {
impl From<Error> for rspc::Error {
fn from(err: Error) -> Self {
match err {
FileIdentifierError::SubPath(sub_path_err) => sub_path_err.into(),
Error::SubPath(sub_path_err) => sub_path_err.into(),

_ => Self::with_cause(ErrorCode::InternalServerError, err.to_string(), err),
}
}
}

#[derive(thiserror::Error, Debug, Serialize, Deserialize, Type)]
pub enum NonCriticalFileIdentifierError {
pub enum NonCriticalError {
#[error("failed to extract file metadata: {0}")]
FailedToExtractFileMetadata(String),
#[cfg(target_os = "windows")]
Expand Down
19 changes: 11 additions & 8 deletions core/crates/heavy-lifting/src/file_identifier/shallow.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{utils::sub_path::maybe_get_iso_file_path_from_sub_path, Error, NonCriticalJobError};
use crate::{
file_identifier, utils::sub_path::maybe_get_iso_file_path_from_sub_path, Error,
NonCriticalError,
};

use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_for_file_identifier;
Expand All @@ -22,7 +25,7 @@ use tracing::{debug, warn};

use super::{
tasks::{ExtractFileMetadataTask, ExtractFileMetadataTaskOutput, ObjectProcessorTask},
FileIdentifierError, CHUNK_SIZE,
CHUNK_SIZE,
};

pub async fn shallow(
Expand All @@ -32,24 +35,24 @@ pub async fn shallow(
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
invalidate_query: impl Fn(&'static str) + Send + Sync,
) -> Result<Vec<NonCriticalJobError>, Error> {
) -> Result<Vec<NonCriticalError>, Error> {
let sub_path = sub_path.as_ref();

let location_path = maybe_missing(&location.path, "location.path")
.map(PathBuf::from)
.map(Arc::new)
.map_err(FileIdentifierError::from)?;
.map_err(file_identifier::Error::from)?;

let location = Arc::new(location);

let sub_iso_file_path =
maybe_get_iso_file_path_from_sub_path(location.id, &Some(sub_path), &*location_path, &db)
.await
.map_err(FileIdentifierError::from)?
.map_err(file_identifier::Error::from)?
.map_or_else(
|| {
IsolatedFilePathData::new(location.id, &*location_path, &*location_path, true)
.map_err(FileIdentifierError::from)
.map_err(file_identifier::Error::from)
},
Ok,
)?;
Expand All @@ -74,7 +77,7 @@ pub async fn shallow(
.select(file_path_for_file_identifier::select())
.exec()
.await
.map_err(FileIdentifierError::from)?;
.map_err(file_identifier::Error::from)?;

let Some(last_orphan) = orphan_paths.last() else {
// No orphans here!
Expand Down Expand Up @@ -117,7 +120,7 @@ async fn process_tasks(
dispatcher: BaseTaskDispatcher<Error>,
db: Arc<PrismaClient>,
sync: Arc<SyncManager>,
) -> Result<Vec<NonCriticalJobError>, Error> {
) -> Result<Vec<NonCriticalError>, Error> {
let mut pending_running_tasks = pending_running_tasks.lend_mut();

let mut errors = vec![];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
file_identifier::{FileMetadata, NonCriticalFileIdentifierError},
Error, NonCriticalJobError,
file_identifier::{self, FileMetadata},
Error, NonCriticalError,
};

use sd_core_file_path_helper::IsolatedFilePathData;
Expand Down Expand Up @@ -34,15 +34,15 @@ pub struct ExtractFileMetadataTask {
file_paths_by_id: HashMap<Uuid, file_path_for_file_identifier::Data>,
identified_files: HashMap<Uuid, IdentifiedFile>,
extract_metadata_time: Duration,
errors: Vec<NonCriticalJobError>,
errors: Vec<NonCriticalError>,
is_shallow: bool,
}

#[derive(Debug)]
pub struct ExtractFileMetadataTaskOutput {
pub identified_files: HashMap<Uuid, IdentifiedFile>,
pub extract_metadata_time: Duration,
pub errors: Vec<NonCriticalJobError>,
pub errors: Vec<NonCriticalError>,
}

impl ExtractFileMetadataTask {
Expand Down Expand Up @@ -207,7 +207,7 @@ fn handle_non_critical_errors(
location_id: location::id::Type,
file_path_pub_id: Uuid,
e: &FileIOError,
errors: &mut Vec<NonCriticalJobError>,
errors: &mut Vec<NonCriticalError>,
) {
error!("Failed to extract file metadata <location_id={location_id}, file_path_pub_id='{file_path_pub_id}'>: {e:#?}");

Expand All @@ -218,22 +218,23 @@ fn handle_non_critical_errors(
// Handle case where file is on-demand (NTFS only)
if e.source.raw_os_error().map_or(false, |code| code == 362) {
errors.push(
NonCriticalFileIdentifierError::FailedToExtractMetadataFromOnDemandFile(
file_identifier::NonCriticalError::FailedToExtractMetadataFromOnDemandFile(
formatted_error,
)
.into(),
);
} else {
errors.push(
NonCriticalFileIdentifierError::FailedToExtractFileMetadata(formatted_error).into(),
file_identifier::NonCriticalError::FailedToExtractFileMetadata(formatted_error)
.into(),
);
}
}

#[cfg(not(target_os = "windows"))]
{
errors.push(
NonCriticalFileIdentifierError::FailedToExtractFileMetadata(formatted_error).into(),
file_identifier::NonCriticalError::FailedToExtractFileMetadata(formatted_error).into(),
);
}
}
Expand All @@ -243,15 +244,15 @@ fn try_iso_file_path_extraction(
file_path_pub_id: Uuid,
file_path: &file_path_for_file_identifier::Data,
location_path: Arc<PathBuf>,
errors: &mut Vec<NonCriticalJobError>,
errors: &mut Vec<NonCriticalError>,
) -> Option<(Uuid, IsolatedFilePathData<'static>, Arc<PathBuf>)> {
IsolatedFilePathData::try_from((location_id, file_path))
.map(IsolatedFilePathData::to_owned)
.map(|iso_file_path| (file_path_pub_id, iso_file_path, location_path))
.map_err(|e| {
error!("Failed to extract isolated file path data: {e:#?}");
errors.push(
NonCriticalFileIdentifierError::FailedToExtractIsolatedFilePathData(format!(
file_identifier::NonCriticalError::FailedToExtractIsolatedFilePathData(format!(
"<file_path_pub_id='{file_path_pub_id}', error={e}>"
))
.into(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{file_identifier::FileIdentifierError, Error};
use crate::{file_identifier, Error};

use sd_core_prisma_helpers::{
file_path_for_file_identifier, file_path_pub_id, object_for_file_identifier,
Expand Down Expand Up @@ -208,7 +208,7 @@ async fn assign_cas_id_to_file_paths(
identified_files: &HashMap<Uuid, IdentifiedFile>,
db: &PrismaClient,
sync: &SyncManager,
) -> Result<(), FileIdentifierError> {
) -> Result<(), file_identifier::Error> {
// Assign cas_id to each file path
sync.write_ops(
db,
Expand Down Expand Up @@ -243,7 +243,7 @@ async fn assign_cas_id_to_file_paths(
async fn fetch_existing_objects_by_cas_id(
identified_files: &HashMap<Uuid, IdentifiedFile>,
db: &PrismaClient,
) -> Result<HashMap<String, object_for_file_identifier::Data>, FileIdentifierError> {
) -> Result<HashMap<String, object_for_file_identifier::Data>, file_identifier::Error> {
// Retrieves objects that are already connected to file paths with the same id
db.object()
.find_many(vec![object::file_paths::some(vec![
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn assign_existing_objects_to_file_paths(
objects_by_cas_id: &HashMap<String, object_for_file_identifier::Data>,
db: &PrismaClient,
sync: &SyncManager,
) -> Result<Vec<file_path_pub_id::Data>, FileIdentifierError> {
) -> Result<Vec<file_path_pub_id::Data>, file_identifier::Error> {
// Attempt to associate each file path with an object that has been
// connected to file paths with the same cas_id
sync.write_ops(
Expand Down Expand Up @@ -341,7 +341,7 @@ async fn create_objects(
identified_files: &HashMap<Uuid, IdentifiedFile>,
db: &PrismaClient,
sync: &SyncManager,
) -> Result<u64, FileIdentifierError> {
) -> Result<u64, file_identifier::Error> {
trace!("Creating {} new Objects", identified_files.len(),);

let (object_create_args, file_path_update_args) = identified_files
Expand Down
Loading

0 comments on commit b3b4050

Please sign in to comment.