Skip to content

Commit

Permalink
Add sync-service boilerplate
Browse files Browse the repository at this point in the history
  • Loading branch information
richardpringle committed Aug 30, 2023
1 parent b0ce83f commit 4739cd0
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 82 deletions.
13 changes: 8 additions & 5 deletions rpc/src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use rpc::{rpcdb::database_server::DatabaseServer, service::database::DatabaseService};

use rpc::{
rpcdb::database_server::DatabaseServer as RpcServer, sync::db_server::DbServer as SyncServer,
DatabaseService,
};
use std::sync::Arc;
use tonic::transport::Server;

// TODO: use clap to parse command line input to run the server
Expand All @@ -9,12 +12,12 @@ async fn main() {

println!("Database-Server listening on: {}", addr);

let svc = DatabaseService::default();
let svc = DatabaseServer::new(svc);
let svc = Arc::new(DatabaseService::default());

// TODO: graceful shutdown
Server::builder()
.add_service(svc)
.add_service(RpcServer::from_arc(svc.clone()))
.add_service(SyncServer::from_arc(svc))
.serve(addr)
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub mod rpcdb {
}

pub mod service;

pub use service::Database as DatabaseService;
79 changes: 79 additions & 0 deletions rpc/src/service.rs
Original file line number Diff line number Diff line change
@@ -1 +1,80 @@
use firewood::v2::{
api::{Db, Error},
emptydb::{EmptyDb, HistoricalImpl},
};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::Mutex;
use tonic::Status;

pub mod database;
pub mod db;

trait IntoStatusResult<T> {
fn into_status_result(self) -> Result<T, Status>;
}

impl<T> IntoStatusResult<T> for Result<T, Error> {
fn into_status_result(self) -> Result<T, Status> {
self.map_err(|err| match err {
Error::HashNotFound { provided: _ } => todo!(),
Error::IncorrectRootHash {
provided: _,
current: _,
} => todo!(),
Error::IO(_) => todo!(),
Error::InvalidProposal => todo!(),
_ => todo!(),
})
}
}
pub struct Database {
db: EmptyDb,
iterators: Arc<Mutex<Iterators>>,
}

impl Default for Database {
fn default() -> Self {
Self {
db: EmptyDb,
iterators: Default::default(),
}
}
}

impl Database {
async fn revision(&self) -> Result<Arc<HistoricalImpl>, Error> {
let root_hash = self.db.root_hash().await?;
self.db.revision(root_hash).await
}
}

// TODO: implement Iterator
struct Iter;

#[derive(Default)]
struct Iterators {
map: HashMap<u64, Iter>,
next_id: AtomicU64,
}

impl Iterators {
fn insert(&mut self, iter: Iter) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.map.insert(id, iter);
id
}

fn _get(&self, id: u64) -> Option<&Iter> {
self.map.get(&id)
}

fn remove(&mut self, id: u64) {
self.map.remove(&id);
}
}
80 changes: 3 additions & 77 deletions rpc/src/service/database.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::{Database as DatabaseService, IntoStatusResult, Iter};
use crate::rpcdb::{
database_server::Database, CloseRequest, CloseResponse, CompactRequest, CompactResponse,
DeleteRequest, DeleteResponse, GetRequest, GetResponse, HasRequest, HasResponse,
Expand All @@ -6,66 +7,10 @@ use crate::rpcdb::{
NewIteratorWithStartAndPrefixRequest, NewIteratorWithStartAndPrefixResponse, PutRequest,
PutResponse, WriteBatchRequest, WriteBatchResponse,
};
use firewood::v2::{
api::{BatchOp, Db, DbView, Error as DbError, Proposal},
emptydb::{EmptyDb, HistoricalImpl},
};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::Mutex;
use firewood::v2::api::{BatchOp, Db, DbView, Proposal};
use std::sync::Arc;
use tonic::{async_trait, Request, Response, Status};

pub struct DatabaseService {
db: EmptyDb,
iterators: Arc<Mutex<Iterators>>,
}

impl Default for DatabaseService {
fn default() -> Self {
Self {
db: EmptyDb,
iterators: Default::default(),
}
}
}

impl DatabaseService {
async fn revision(&self) -> Result<Arc<HistoricalImpl>, DbError> {
let root_hash = self.db.root_hash().await?;
self.db.revision(root_hash).await
}
}

// TODO: implement Iterator
struct Iter;

#[derive(Default)]
struct Iterators {
map: HashMap<u64, Iter>,
next_id: AtomicU64,
}

impl Iterators {
fn insert(&mut self, iter: Iter) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.map.insert(id, iter);
id
}

fn _get(&self, id: u64) -> Option<&Iter> {
self.map.get(&id)
}

fn remove(&mut self, id: u64) {
self.map.remove(&id);
}
}

#[async_trait]
impl Database for DatabaseService {
async fn has(&self, request: Request<HasRequest>) -> Result<Response<HasResponse>, Status> {
Expand Down Expand Up @@ -210,25 +155,6 @@ impl Database for DatabaseService {
}
}

trait IntoStatusResult<T> {
fn into_status_result(self) -> Result<T, Status>;
}

impl<T> IntoStatusResult<T> for Result<T, DbError> {
fn into_status_result(self) -> Result<T, Status> {
self.map_err(|err| match err {
DbError::HashNotFound { provided: _ } => todo!(),
DbError::IncorrectRootHash {
provided: _,
current: _,
} => todo!(),
DbError::IO(_) => todo!(),
DbError::InvalidProposal => todo!(),
_ => todo!(),
})
}
}

fn from_put_request(request: PutRequest) -> BatchOp<Vec<u8>, Vec<u8>> {
BatchOp::Put {
key: request.key,
Expand Down
101 changes: 101 additions & 0 deletions rpc/src/service/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use super::{Database, IntoStatusResult};
use crate::sync::{
db_server::Db as DbServerTrait, CommitChangeProofRequest, CommitRangeProofRequest,
GetChangeProofRequest, GetChangeProofResponse, GetMerkleRootResponse, GetProofRequest,
GetProofResponse, GetRangeProofRequest, GetRangeProofResponse, VerifyChangeProofRequest,
VerifyChangeProofResponse,
};
use firewood::v2::api::Db;
use tonic::{async_trait, Request, Response, Status};

#[async_trait]
impl DbServerTrait for Database {
async fn get_merkle_root(
&self,
_request: Request<()>,
) -> Result<Response<GetMerkleRootResponse>, Status> {
let root_hash = self.db.root_hash().await.into_status_result()?.to_vec();

let response = GetMerkleRootResponse { root_hash };

Ok(Response::new(response))
}

async fn get_proof(
&self,
request: Request<GetProofRequest>,
) -> Result<Response<GetProofResponse>, Status> {
let GetProofRequest { key: _ } = request.into_inner();
let _revision = self.revision().await.into_status_result()?;

todo!()
}

async fn get_change_proof(
&self,
request: Request<GetChangeProofRequest>,
) -> Result<Response<GetChangeProofResponse>, Status> {
let GetChangeProofRequest {
start_root_hash: _,
end_root_hash: _,
start_key: _,
end_key: _,
key_limit: _,
} = request.into_inner();

let _revision = self.revision().await.into_status_result()?;

todo!()
}

async fn verify_change_proof(
&self,
request: Request<VerifyChangeProofRequest>,
) -> Result<Response<VerifyChangeProofResponse>, Status> {
let VerifyChangeProofRequest {
proof: _,
start_key: _,
end_key: _,
expected_root_hash: _,
} = request.into_inner();

let _revision = self.revision().await.into_status_result()?;

todo!()
}

async fn commit_change_proof(
&self,
request: Request<CommitChangeProofRequest>,
) -> Result<Response<()>, Status> {
let CommitChangeProofRequest { proof: _ } = request.into_inner();

todo!()
}

async fn get_range_proof(
&self,
request: Request<GetRangeProofRequest>,
) -> Result<Response<GetRangeProofResponse>, Status> {
let GetRangeProofRequest {
root_hash: _,
start_key: _,
end_key: _,
key_limit: _,
} = request.into_inner();

todo!()
}

async fn commit_range_proof(
&self,
request: Request<CommitRangeProofRequest>,
) -> Result<Response<()>, Status> {
let CommitRangeProofRequest {
start_key: _,
range_proof: _,
} = request.into_inner();

todo!()
}
}

0 comments on commit 4739cd0

Please sign in to comment.