Skip to content

Commit

Permalink
Initialize cloud sync
Browse files Browse the repository at this point in the history
  • Loading branch information
fogodev committed Sep 13, 2024
1 parent 4d662b2 commit 9ccd9f4
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 84 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ rust-version = "1.80"

[workspace.dependencies]
# First party dependencies
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", branch = "main" }
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "25e4b92fdd" }

# Third party dependencies used by one or more of our crates
async-channel = "2.3"
Expand Down
6 changes: 6 additions & 0 deletions core/crates/cloud-services/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,9 @@ impl From<Error> for rspc::Error {
Self::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e)
}
}

impl From<GetTokenError> for rspc::Error {
fn from(e: GetTokenError) -> Self {
Self::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e)
}
}
4 changes: 3 additions & 1 deletion core/crates/cloud-services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ mod token_refresher;
pub use client::CloudServices;
pub use error::{Error, GetTokenError};
pub use key_manager::KeyManager;
pub use p2p::{CloudP2P, JoinSyncGroupResponse, NotifyUser, Ticket, UserResponse};
pub use p2p::{
CloudP2P, JoinSyncGroupResponse, JoinedLibraryCreateArgs, NotifyUser, Ticket, UserResponse,
};
pub use sync::{
declare_actors as declare_cloud_sync, SyncActors as CloudSyncActors,
SyncActorsState as CloudSyncActorsState,
Expand Down
20 changes: 18 additions & 2 deletions core/crates/cloud-services/src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{CloudServices, Error};
use sd_cloud_schema::{
cloud_p2p::{authorize_new_device_in_sync_group, CloudP2PALPN, CloudP2PError},
devices::{self, Device},
libraries,
sync::groups::GroupWithLibraryAndDevices,
};
use sd_crypto::{CryptoRng, SeedableRng};
Expand All @@ -14,13 +15,20 @@ use iroh_net::{
Endpoint, NodeId,
};
use serde::{Deserialize, Serialize};
use tokio::spawn;
use tokio::{spawn, sync::oneshot};
use tracing::error;

mod runner;

use runner::Runner;

#[derive(Debug)]
pub struct JoinedLibraryCreateArgs {
pub pub_id: libraries::PubId,
pub name: String,
pub description: Option<String>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, specta::Type)]
#[serde(transparent)]
#[repr(transparent)]
Expand Down Expand Up @@ -66,7 +74,13 @@ pub enum JoinSyncGroupResponse {
#[derive(Debug, Deserialize, specta::Type)]
#[serde(tag = "kind", content = "data")]
pub enum UserResponse {
AcceptDeviceInSyncGroup { ticket: Ticket, accepted: bool },
AcceptDeviceInSyncGroup {
ticket: Ticket,
accepted: bool,
library_pub_id: libraries::PubId,
library_name: String,
library_description: Option<String>,
},
}
#[derive(Debug, Clone)]
pub struct CloudP2P {
Expand Down Expand Up @@ -128,11 +142,13 @@ impl CloudP2P {
&self,
devices_in_group: Vec<(devices::PubId, NodeId)>,
req: authorize_new_device_in_sync_group::Request,
tx: oneshot::Sender<JoinedLibraryCreateArgs>,
) {
self.msgs_tx
.send_async(runner::Message::Request(runner::Request::JoinSyncGroup {
req,
devices_in_group,
tx,
}))
.await
.expect("Channel closed");
Expand Down
52 changes: 45 additions & 7 deletions core/crates/cloud-services/src/p2p/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use sd_cloud_schema::{
self, authorize_new_device_in_sync_group, Client, CloudP2PALPN, CloudP2PError, Service,
},
devices::{self, Device},
libraries,
sync::groups,
};
use sd_crypto::{CryptoRng, SeedableRng};
Expand All @@ -32,14 +33,14 @@ use quic_rpc::{
};
use tokio::{
spawn,
sync::Mutex,
sync::{oneshot, Mutex},
task::JoinHandle,
time::{interval, Instant, MissedTickBehavior},
};
use tokio_stream::wrappers::IntervalStream;
use tracing::{debug, error, warn};

use super::{JoinSyncGroupResponse, NotifyUser, Ticket, UserResponse};
use super::{JoinSyncGroupResponse, JoinedLibraryCreateArgs, NotifyUser, Ticket, UserResponse};

const TEN_SECONDS: Duration = Duration::from_secs(10);
const FIVE_MINUTES: Duration = Duration::from_secs(60 * 5);
Expand All @@ -54,6 +55,7 @@ pub enum Request {
JoinSyncGroup {
req: authorize_new_device_in_sync_group::Request,
devices_in_group: Vec<(devices::PubId, NodeId)>,
tx: oneshot::Sender<JoinedLibraryCreateArgs>,
},
}

Expand Down Expand Up @@ -177,13 +179,24 @@ impl Runner {
StreamMessage::Message(Message::Request(Request::JoinSyncGroup {
req,
devices_in_group,
})) => self.dispatch_join_requests(req, devices_in_group, &mut rng),
tx,
})) => self.dispatch_join_requests(req, devices_in_group, &mut rng, tx),

StreamMessage::UserResponse(UserResponse::AcceptDeviceInSyncGroup {
ticket,
accepted,
library_pub_id,
library_name,
library_description,
}) => {
self.handle_join_response(ticket, accepted).await;
self.handle_join_response(
ticket,
accepted,
library_pub_id,
library_name,
library_description,
)
.await;
}

StreamMessage::Tick => self.tick().await,
Expand All @@ -201,13 +214,15 @@ impl Runner {
req: authorize_new_device_in_sync_group::Request,
devices_in_group: Vec<(devices::PubId, NodeId)>,
rng: &mut CryptoRng,
tx: oneshot::Sender<JoinedLibraryCreateArgs>,
) {
async fn inner(
key_manager: Arc<KeyManager>,
endpoint: Endpoint,
mut rng: CryptoRng,
req: authorize_new_device_in_sync_group::Request,
devices_in_group: Vec<(devices::PubId, NodeId)>,
tx: oneshot::Sender<JoinedLibraryCreateArgs>,
) -> Result<JoinSyncGroupResponse, Error> {
let group_pub_id = req.sync_group.pub_id;
loop {
Expand All @@ -226,6 +241,9 @@ impl Runner {
Ok(authorize_new_device_in_sync_group::Response {
authorizor_device,
keys,
library_pub_id,
library_name,
library_description,
}) => {
key_manager
.add_many_keys(
Expand All @@ -239,7 +257,17 @@ impl Runner {
)
.await?;

// TODO(@fogodev): Figure out a way to dispatch sync related actors now that we have the keys
if tx
.send(JoinedLibraryCreateArgs {
pub_id: library_pub_id,
name: library_name,
description: library_description,
})
.is_err()
{
error!("Failed to handle library creation locally from received library data");
return Ok(JoinSyncGroupResponse::CriticalError);
}

return Ok(JoinSyncGroupResponse::Accepted { authorizor_device });
}
Expand All @@ -260,7 +288,7 @@ impl Runner {

if let Err(SendError(response)) = notify_user_tx
.send_async(NotifyUser::ReceivedJoinSyncGroupResponse {
response: inner(key_manager, endpoint, rng, req, devices_in_group)
response: inner(key_manager, endpoint, rng, req, devices_in_group, tx)
.await
.unwrap_or_else(|e| {
error!(
Expand Down Expand Up @@ -326,7 +354,14 @@ impl Runner {
}
}

async fn handle_join_response(&self, ticket: Ticket, accepted: bool) {
async fn handle_join_response(
&self,
ticket: Ticket,
accepted: bool,
library_pub_id: libraries::PubId,
library_name: String,
library_description: Option<String>,
) {
let Some(PendingSyncGroupJoin {
channel,
request,
Expand Down Expand Up @@ -355,6 +390,9 @@ impl Runner {
.into_iter()
.map(Into::into)
.collect(),
library_pub_id,
library_name,
library_description,
})
} else {
Err(CloudP2PError::Rejected)
Expand Down
8 changes: 4 additions & 4 deletions core/src/api/cloud/devices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ pub struct DeviceRegisterData {
pub pub_id: PubId,
pub name: String,
pub os: DeviceOS,
pub storage_size: u64,
pub connection_id: NodeId,
pub hardware_model: HardwareModel,
pub storage_size: u64,
pub used_storage: u64,
pub connection_id: NodeId,
}

pub async fn register(
Expand All @@ -231,10 +231,10 @@ pub async fn register(
pub_id,
name,
os,
storage_size,
connection_id,
hardware_model,
storage_size,
used_storage,
connection_id,
}: DeviceRegisterData,
hashed_pub_id: Hash,
rng: &mut CryptoRng,
Expand Down
Loading

0 comments on commit 9ccd9f4

Please sign in to comment.