Skip to content

Commit

Permalink
Sync grpc implementation (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardpringle authored Aug 31, 2023
1 parent 9f10bc0 commit 5b0ed02
Show file tree
Hide file tree
Showing 5 changed files with 360 additions and 242 deletions.
250 changes: 8 additions & 242 deletions rpc/src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,243 +1,9 @@
use firewood::v2::{
api::{BatchOp, Db, DbView, Error as DbError, Proposal},
emptydb::{EmptyDb, HistoricalImpl},
use rpc::{
rpcdb::database_server::DatabaseServer as RpcServer, sync::db_server::DbServer as SyncServer,
DatabaseService,
};
use rpc::rpcdb::{
database_server::{self, DatabaseServer},
CloseRequest, CloseResponse, CompactRequest, CompactResponse, DeleteRequest, DeleteResponse,
GetRequest, GetResponse, HasRequest, HasResponse, HealthCheckResponse, IteratorErrorRequest,
IteratorErrorResponse, IteratorNextRequest, IteratorNextResponse, IteratorReleaseRequest,
IteratorReleaseResponse, NewIteratorWithStartAndPrefixRequest,
NewIteratorWithStartAndPrefixResponse, PutRequest, PutResponse, WriteBatchRequest,
WriteBatchResponse,
};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::Mutex;
use tonic::{async_trait, transport::Server, Request, Response, Status};

struct DatabaseService {
db: EmptyDb,
iterators: Arc<Mutex<Iterators>>,
}

impl DatabaseService {
fn new() -> Self {
let db = EmptyDb;
let iterators = Arc::new(Mutex::new(Iterators::default()));

Self { db, iterators }
}

async fn revision(&self) -> Result<Arc<HistoricalImpl>, DbError> {
let root_hash = self.db.root_hash().await?;
self.db.revision(root_hash).await
}
}

// TODO: implement Iterator
struct Iter;

#[derive(Default)]
struct Iterators {
map: HashMap<u64, Iter>,
next_id: AtomicU64,
}

impl Iterators {
fn insert(&mut self, iter: Iter) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.map.insert(id, iter);
id
}

fn _get(&self, id: u64) -> Option<&Iter> {
self.map.get(&id)
}

fn remove(&mut self, id: u64) {
self.map.remove(&id);
}
}

#[async_trait]
impl database_server::Database for DatabaseService {
async fn has(&self, request: Request<HasRequest>) -> Result<Response<HasResponse>, Status> {
let key = request.into_inner().key;
let revision = self.revision().await.into_status_result()?;

let val = revision.val(key).await.into_status_result()?;

let response = HasResponse {
has: val.is_some(),
..Default::default()
};

Ok(Response::new(response))
}

async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
let key = request.into_inner().key;
let revision = self.revision().await.into_status_result()?;

let value = revision
.val(key)
.await
.into_status_result()?
.map(|v| v.to_vec());

let Some(value) = value else {
return Err(Status::not_found("key not found"));
};

let response = GetResponse {
value,
..Default::default()
};

Ok(Response::new(response))
}

async fn put(&self, request: Request<PutRequest>) -> Result<Response<PutResponse>, Status> {
let PutRequest { key, value } = request.into_inner();
let batch = BatchOp::Put { key, value };
let proposal = Arc::new(self.db.propose(vec![batch]).await.into_status_result()?);
let _ = proposal.commit().await.into_status_result()?;

Ok(Response::new(PutResponse::default()))
}

async fn delete(
&self,
request: Request<DeleteRequest>,
) -> Result<Response<DeleteResponse>, Status> {
let DeleteRequest { key } = request.into_inner();
let batch = BatchOp::<_, Vec<u8>>::Delete { key };
let propoal = Arc::new(self.db.propose(vec![batch]).await.into_status_result()?);
let _ = propoal.commit().await.into_status_result()?;

Ok(Response::new(DeleteResponse::default()))
}

async fn compact(
&self,
_request: Request<CompactRequest>,
) -> Result<Response<CompactResponse>, Status> {
Err(Status::unimplemented("compact not implemented"))
}

async fn close(
&self,
_request: Request<CloseRequest>,
) -> Result<Response<CloseResponse>, Status> {
Err(Status::unimplemented("close not implemented"))
}

async fn health_check(
&self,
_request: Request<()>,
) -> Result<Response<HealthCheckResponse>, Status> {
// TODO: why is the response a Vec<u8>?
Ok(Response::new(HealthCheckResponse::default()))
}

async fn write_batch(
&self,
request: Request<WriteBatchRequest>,
) -> Result<Response<WriteBatchResponse>, Status> {
let WriteBatchRequest { puts, deletes } = request.into_inner();
let batch = puts
.into_iter()
.map(from_put_request)
.chain(deletes.into_iter().map(from_delete_request))
.collect();
let proposal = Arc::new(self.db.propose(batch).await.into_status_result()?);
let _ = proposal.commit().await.into_status_result()?;

Ok(Response::new(WriteBatchResponse::default()))
}

async fn new_iterator_with_start_and_prefix(
&self,
request: Request<NewIteratorWithStartAndPrefixRequest>,
) -> Result<Response<NewIteratorWithStartAndPrefixResponse>, Status> {
let NewIteratorWithStartAndPrefixRequest {
start: _,
prefix: _,
} = request.into_inner();

// TODO: create the actual iterator
let id = {
let mut iters = self.iterators.lock().await;
iters.insert(Iter)
};

Ok(Response::new(NewIteratorWithStartAndPrefixResponse { id }))
}

async fn iterator_next(
&self,
_request: Request<IteratorNextRequest>,
) -> Result<Response<IteratorNextResponse>, Status> {
Err(Status::unimplemented("iterator_next not implemented"))
}

async fn iterator_error(
&self,
_request: Request<IteratorErrorRequest>,
) -> Result<Response<IteratorErrorResponse>, Status> {
Err(Status::unimplemented("iterator_error not implemented"))
}

async fn iterator_release(
&self,
request: Request<IteratorReleaseRequest>,
) -> Result<Response<IteratorReleaseResponse>, Status> {
let IteratorReleaseRequest { id } = request.into_inner();

{
let mut iters = self.iterators.lock().await;
iters.remove(id);
}

Ok(Response::new(IteratorReleaseResponse::default()))
}
}

trait IntoStatusResult<T> {
fn into_status_result(self) -> Result<T, Status>;
}

impl<T> IntoStatusResult<T> for Result<T, DbError> {
fn into_status_result(self) -> Result<T, Status> {
self.map_err(|err| match err {
DbError::HashNotFound { provided: _ } => todo!(),
DbError::IncorrectRootHash {
provided: _,
current: _,
} => todo!(),
DbError::IO(_) => todo!(),
DbError::InvalidProposal => todo!(),
_ => todo!(),
})
}
}

fn from_put_request(request: PutRequest) -> BatchOp<Vec<u8>, Vec<u8>> {
BatchOp::Put {
key: request.key,
value: request.value,
}
}

fn from_delete_request(request: DeleteRequest) -> BatchOp<Vec<u8>, Vec<u8>> {
BatchOp::Delete { key: request.key }
}
use std::sync::Arc;
use tonic::transport::Server;

// TODO: use clap to parse command line input to run the server
#[tokio::main]
Expand All @@ -246,12 +12,12 @@ async fn main() {

println!("Database-Server listening on: {}", addr);

let svc = DatabaseService::new();
let svc = DatabaseServer::new(svc);
let svc = Arc::new(DatabaseService::default());

// TODO: graceful shutdown
Server::builder()
.add_service(svc)
.add_service(RpcServer::from_arc(svc.clone()))
.add_service(SyncServer::from_arc(svc))
.serve(addr)
.await
.unwrap();
Expand Down
4 changes: 4 additions & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ pub mod sync {
pub mod rpcdb {
tonic::include_proto!("rpcdb");
}

pub mod service;

pub use service::Database as DatabaseService;
80 changes: 80 additions & 0 deletions rpc/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use firewood::v2::{
api::{Db, Error},
emptydb::{EmptyDb, HistoricalImpl},
};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::Mutex;
use tonic::Status;

pub mod database;
pub mod db;

trait IntoStatusResultExt<T> {
fn into_status_result(self) -> Result<T, Status>;
}

impl<T> IntoStatusResultExt<T> for Result<T, Error> {
fn into_status_result(self) -> Result<T, Status> {
self.map_err(|err| match err {
Error::HashNotFound { provided: _ } => todo!(),
Error::IncorrectRootHash {
provided: _,
current: _,
} => todo!(),
Error::IO(_) => todo!(),
Error::InvalidProposal => todo!(),
_ => todo!(),
})
}
}
pub struct Database {
db: EmptyDb,
iterators: Arc<Mutex<Iterators>>,
}

impl Default for Database {
fn default() -> Self {
Self {
db: EmptyDb,
iterators: Default::default(),
}
}
}

impl Database {
async fn revision(&self) -> Result<Arc<HistoricalImpl>, Error> {
let root_hash = self.db.root_hash().await?;
self.db.revision(root_hash).await
}
}

// TODO: implement Iterator
struct Iter;

#[derive(Default)]
struct Iterators {
map: HashMap<u64, Iter>,
next_id: AtomicU64,
}

impl Iterators {
fn insert(&mut self, iter: Iter) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.map.insert(id, iter);
id
}

fn _get(&self, id: u64) -> Option<&Iter> {
self.map.get(&id)
}

fn remove(&mut self, id: u64) {
self.map.remove(&id);
}
}
Loading

0 comments on commit 5b0ed02

Please sign in to comment.