Skip to content

Commit

Permalink
Merge branch 'spacedriveapp:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghav-45 committed Aug 15, 2023
2 parents de274c3 + 9c0aec8 commit c1bfc32
Show file tree
Hide file tree
Showing 76 changed files with 3,710 additions and 2,397 deletions.
1 change: 1 addition & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
allow-unwrap-in-tests = true
3 changes: 1 addition & 2 deletions .github/scripts/setup-system.sh
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,8 @@ elif [ "$SYSNAME" = "Darwin" ]; then
echo "Download ffmpeg build..."
_page=1
while [ $_page -gt 0 ]; do
# TODO: Filter only actions triggered by the main branch
_success=$(gh_curl "${_gh_url}/${_sd_gh_path}/actions/workflows/ffmpeg-macos.yml/runs?page=${_page}&per_page=100&status=success" \
| jq -r '. as $raw | .workflow_runs | if length == 0 then error("Error: \($raw)") else .[] | .artifacts_url end' \
| jq -r '. as $raw | .workflow_runs | if length == 0 then error("Error: \($raw)") else .[] | select(.head_branch == "main") | .artifacts_url end' \
| while IFS= read -r _artifacts_url; do
if _artifact_path="$(
gh_curl "$_artifacts_url" \
Expand Down
5 changes: 1 addition & 4 deletions core/crates/sync/src/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ impl Actor {

let mut timestamp = {
let mut clocks = self.timestamps.write().await;
clocks
.entry(op.instance)
.or_insert_with(|| op.timestamp)
.clone()
*clocks.entry(op.instance).or_insert_with(|| op.timestamp)
};

if timestamp < op.timestamp {
Expand Down
14 changes: 7 additions & 7 deletions core/crates/sync/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ pub struct Manager {
shared: Arc<SharedState>,
}

pub struct SyncManagerNew {
pub manager: Manager,
pub rx: broadcast::Receiver<SyncMessage>,
}

#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
pub struct GetOpsArgs {
pub clocks: Vec<(Uuid, NTP64)>,
pub count: u32,
}

pub struct New<T> {
pub manager: T,
pub rx: broadcast::Receiver<SyncMessage>,
}

impl Manager {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> SyncManagerNew {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> New<Self> {
let (tx, rx) = broadcast::channel(64);

let timestamps: Timestamps = Default::default();
Expand All @@ -41,7 +41,7 @@ impl Manager {

let ingest = ingest::Actor::spawn(shared.clone());

SyncManagerNew {
New {
manager: Self { shared, tx, ingest },
rx,
}
Expand Down
7 changes: 3 additions & 4 deletions core/crates/sync/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,14 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {

async move {
while let Ok(msg) = sync_rx1.recv().await {
match msg {
SyncMessage::Created => instance2
if let SyncMessage::Created = msg {
instance2
.sync
.ingest
.event_tx
.send(ingest::Event::Notification)
.await
.unwrap(),
_ => {}
.unwrap()
}
}
}
Expand Down
85 changes: 76 additions & 9 deletions core/src/api/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
erase::FileEraserJobInit,
},
prisma::{file_path, location, object},
util::{db::maybe_missing, error::FileIOError},
};

use std::path::Path;
Expand All @@ -24,8 +25,8 @@ use regex::Regex;
use rspc::{alpha::AlphaRouter, ErrorCode};
use serde::Deserialize;
use specta::Type;
use tokio::fs;
use tracing::error;
use tokio::{fs, io};
use tracing::{error, warn};

use super::{Ctx, R};

Expand Down Expand Up @@ -129,12 +130,12 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
})
.procedure("updateAccessTime", {
R.with2(library())
.mutation(|(_, library), id: i32| async move {
.mutation(|(_, library), ids: Vec<i32>| async move {
library
.db
.object()
.update(
object::id::equals(id),
.update_many(
vec![object::id::in_vec(ids)],
vec![object::date_accessed::set(Some(Utc::now().into()))],
)
.exec()
Expand Down Expand Up @@ -176,10 +177,76 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.procedure("deleteFiles", {
R.with2(library())
.mutation(|(node, library), args: FileDeleterJobInit| async move {
Job::new(args)
.spawn(&node, &library)
.await
.map_err(Into::into)
match args.file_path_ids.len() {
0 => Ok(()),
1 => {
let (maybe_location, maybe_file_path) = library
.db
._batch((
library
.db
.location()
.find_unique(location::id::equals(args.location_id))
.select(location::select!({ path })),
library
.db
.file_path()
.find_unique(file_path::id::equals(args.file_path_ids[0]))
.select(file_path_to_isolate::select()),
))
.await?;

let location_path = maybe_missing(
maybe_location
.ok_or(LocationError::IdNotFound(args.location_id))?
.path,
"location.path",
)
.map_err(LocationError::from)?;

let file_path = maybe_file_path.ok_or(LocationError::FilePath(
FilePathError::IdNotFound(args.file_path_ids[0]),
))?;

let full_path = Path::new(&location_path).join(
IsolatedFilePathData::try_from(&file_path)
.map_err(LocationError::MissingField)?,
);

match if maybe_missing(file_path.is_dir, "file_path.is_dir")
.map_err(LocationError::MissingField)?
{
fs::remove_dir_all(&full_path).await
} else {
fs::remove_file(&full_path).await
} {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => {
warn!(
"File not found in the file system, will remove from database: {}",
full_path.display()
);
library
.db
.file_path()
.delete(file_path::id::equals(args.file_path_ids[0]))
.exec()
.await
.map_err(LocationError::from)?;

Ok(())
}
Err(e) => {
Err(LocationError::from(FileIOError::from((full_path, e)))
.into())
}
}
}
_ => Job::new(args)
.spawn(&node, &library)
.await
.map_err(Into::into),
}
})
})
.procedure("eraseFiles", {
Expand Down
4 changes: 2 additions & 2 deletions core/src/api/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
}
};

let instant = intervals.entry(progress_event.id).or_insert_with(||
Instant::now()
let instant = intervals.entry(progress_event.id).or_insert_with(
Instant::now
);

if instant.elapsed() <= Duration::from_secs_f64(1.0 / 30.0) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/api/locations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
util::AbortOnDrop,
};

use std::path::{Path, PathBuf};
use std::path::PathBuf;

use rspc::{self, alpha::AlphaRouter, ErrorCode};
use serde::{Deserialize, Serialize};
Expand Down
39 changes: 39 additions & 0 deletions core/src/api/tags.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use chrono::Utc;
use rspc::{alpha::AlphaRouter, ErrorCode};
use sd_prisma::prisma_sync;
Expand Down Expand Up @@ -36,6 +38,42 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.await?)
})
})
.procedure("getWithObjects", {
R.with2(library()).query(
|(_, library), object_ids: Vec<object::id::Type>| async move {
let Library { db, .. } = library.as_ref();

let tags_with_objects = db
.tag()
.find_many(vec![tag::tag_objects::some(vec![
tag_on_object::object_id::in_vec(object_ids.clone()),
])])
.select(tag::select!({
id
tag_objects(vec![tag_on_object::object_id::in_vec(object_ids.clone())]): select {
object: select {
id
}
}
}))
.exec()
.await?;

Ok(tags_with_objects
.into_iter()
.map(|tag| {
(
tag.id,
tag.tag_objects
.into_iter()
.map(|rel| rel.object.id)
.collect::<Vec<_>>(),
)
})
.collect::<BTreeMap<_, _>>())
},
)
})
.procedure("get", {
R.with2(library())
.query(|(_, library), tag_id: i32| async move {
Expand Down Expand Up @@ -137,6 +175,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
}

invalidate_query!(library, "tags.getForObject");
invalidate_query!(library, "tags.getWithObjects");

Ok(())
})
Expand Down
32 changes: 16 additions & 16 deletions core/src/api/utils/invalidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,20 @@ impl InvalidRequests {
/// );
/// ```
#[macro_export]
#[allow(clippy::crate_in_macro_def)]
// #[allow(clippy::crate_in_macro_def)]
macro_rules! invalidate_query {
($ctx:expr, $key:literal) => {{
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type
let ctx: &$crate::library::Library = &$ctx; // Assert the context is the correct type

#[cfg(debug_assertions)]
{
#[ctor::ctor]
fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS
$crate::api::utils::INVALIDATION_REQUESTS
.lock()
.unwrap()
.queries
.push(crate::api::utils::InvalidationRequest {
.push($crate::api::utils::InvalidationRequest {
key: $key,
arg_ty: None,
result_ty: None,
Expand All @@ -139,23 +139,23 @@ macro_rules! invalidate_query {
::tracing::trace!(target: "sd_core::invalidate-query", "invalidate_query!(\"{}\") at {}", $key, concat!(file!(), ":", line!()));

// The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit.
ctx.emit(crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null, None)
ctx.emit($crate::api::CoreEvent::InvalidateOperation(
$crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null, None)
))
}};
($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr $(,)?) => {{
let _: $arg_ty = $arg; // Assert the type the user provided is correct
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type
let ctx: &$crate::library::Library = &$ctx; // Assert the context is the correct type

#[cfg(debug_assertions)]
{
#[ctor::ctor]
fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS
$crate::api::utils::INVALIDATION_REQUESTS
.lock()
.unwrap()
.queries
.push(crate::api::utils::InvalidationRequest {
.push($crate::api::utils::InvalidationRequest {
key: $key,
arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts {
parent_inline: false,
Expand All @@ -172,8 +172,8 @@ macro_rules! invalidate_query {
// The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit.
let _ = serde_json::to_value($arg)
.map(|v|
ctx.emit(crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, v, None),
ctx.emit($crate::api::CoreEvent::InvalidateOperation(
$crate::api::utils::InvalidateOperationEvent::dangerously_create($key, v, None),
))
)
.map_err(|_| {
Expand All @@ -182,17 +182,17 @@ macro_rules! invalidate_query {
}};
($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr, $result_ty:ty: $result:expr $(,)?) => {{
let _: $arg_ty = $arg; // Assert the type the user provided is correct
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type
let ctx: &$crate::library::Library = &$ctx; // Assert the context is the correct type

#[cfg(debug_assertions)]
{
#[ctor::ctor]
fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS
$crate::api::utils::INVALIDATION_REQUESTS
.lock()
.unwrap()
.queries
.push(crate::api::utils::InvalidationRequest {
.push($crate::api::utils::InvalidationRequest {
key: $key,
arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts {
parent_inline: false,
Expand All @@ -214,8 +214,8 @@ macro_rules! invalidate_query {
.and_then(|arg|
serde_json::to_value($result)
.map(|result|
ctx.emit(crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, arg, Some(result)),
ctx.emit($crate::api::CoreEvent::InvalidateOperation(
$crate::api::utils::InvalidateOperationEvent::dangerously_create($key, arg, Some(result)),
))
)
)
Expand Down
2 changes: 1 addition & 1 deletion core/src/library/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl Libraries {
identity,
// key_manager,
db,
&node,
node,
Arc::new(sync.manager),
)
.await;
Expand Down
Loading

0 comments on commit c1bfc32

Please sign in to comment.