From 21459f4d08c990c2f2921cee72a8fa6cacb707ee Mon Sep 17 00:00:00 2001 From: Richard Pringle Date: Tue, 29 Aug 2023 14:17:15 -0400 Subject: [PATCH] Initial rpc-server implementation --- rpc/Cargo.toml | 18 +++ rpc/build.rs | 6 + rpc/proto/rpcdb/rpcdb.proto | 127 ++++++++++++++++++ rpc/proto/sync/sync.proto | 166 +++++++++++++++++++++++ rpc/src/bin/client.rs | 3 + rpc/src/bin/server.rs | 258 ++++++++++++++++++++++++++++++++++++ rpc/src/lib.rs | 7 + rpc/src/main.rs | 3 - 8 files changed, 585 insertions(+), 3 deletions(-) create mode 100644 rpc/build.rs create mode 100644 rpc/proto/rpcdb/rpcdb.proto create mode 100644 rpc/proto/sync/sync.proto create mode 100644 rpc/src/bin/client.rs create mode 100644 rpc/src/bin/server.rs create mode 100644 rpc/src/lib.rs delete mode 100644 rpc/src/main.rs diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 39e82df22..16f4bd074 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -5,4 +5,22 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bin]] +name = "server" +test = false +bench = false + +[[bin]] +name = "client" +test = false +bench = false + [dependencies] +firewood = { version = "0.0.3", path = "../firewood" } +prost = "0.11.9" +thiserror = "1.0.47" +tokio = { version = "1.32.0", features = ["sync", "rt-multi-thread"] } +tonic = { version = "0.9.2", features = ["tls"] } + +[build-dependencies] +tonic-build = "0.9.2" diff --git a/rpc/build.rs b/rpc/build.rs new file mode 100644 index 000000000..54a51676e --- /dev/null +++ b/rpc/build.rs @@ -0,0 +1,6 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/sync/sync.proto")?; + tonic_build::compile_protos("proto/rpcdb/rpcdb.proto")?; + + Ok(()) +} diff --git a/rpc/proto/rpcdb/rpcdb.proto b/rpc/proto/rpcdb/rpcdb.proto new file mode 100644 index 000000000..420d2b7a8 --- /dev/null +++ b/rpc/proto/rpcdb/rpcdb.proto @@ -0,0 +1,127 @@ +syntax = "proto3"; + +package rpcdb; + +import "google/protobuf/empty.proto"; + +option go_package = "github.com/ava-labs/avalanchego/proto/pb/rpcdb"; + +service Database { + rpc Has(HasRequest) returns (HasResponse); + rpc Get(GetRequest) returns (GetResponse); + rpc Put(PutRequest) returns (PutResponse); + rpc Delete(DeleteRequest) returns (DeleteResponse); + rpc Compact(CompactRequest) returns (CompactResponse); + rpc Close(CloseRequest) returns (CloseResponse); + rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResponse); + rpc WriteBatch(WriteBatchRequest) returns (WriteBatchResponse); + rpc NewIteratorWithStartAndPrefix(NewIteratorWithStartAndPrefixRequest) returns (NewIteratorWithStartAndPrefixResponse); + rpc IteratorNext(IteratorNextRequest) returns (IteratorNextResponse); + rpc IteratorError(IteratorErrorRequest) returns (IteratorErrorResponse); + rpc IteratorRelease(IteratorReleaseRequest) returns (IteratorReleaseResponse); +} + +enum Error { + // ERROR_UNSPECIFIED is used to indicate that no error occurred. + ERROR_UNSPECIFIED = 0; + ERROR_CLOSED = 1; + ERROR_NOT_FOUND = 2; +} + +message HasRequest { + bytes key = 1; +} + +message HasResponse { + bool has = 1; + Error err = 2; +} + +message GetRequest { + bytes key = 1; +} + +message GetResponse { + bytes value = 1; + Error err = 2; +} + +message PutRequest { + bytes key = 1; + bytes value = 2; +} + +message PutResponse { + Error err = 1; +} + +message DeleteRequest { + bytes key = 1; +} + +message DeleteResponse { + Error err = 1; +} + +message CompactRequest { + bytes start = 1; + bytes limit = 2; +} + +message CompactResponse { + Error err = 1; +} + +message CloseRequest {} + +message CloseResponse { + Error err = 1; +} + +message WriteBatchRequest { + repeated PutRequest puts = 1; + repeated DeleteRequest deletes = 2; +} + +message WriteBatchResponse { + Error err = 1; +} + +message NewIteratorRequest {} + +message NewIteratorWithStartAndPrefixRequest { + bytes start = 1; + bytes prefix = 2; +} + +message NewIteratorWithStartAndPrefixResponse { + uint64 id = 1; +} + +message IteratorNextRequest { + uint64 id = 1; +} + +message IteratorNextResponse { + repeated PutRequest data = 1; +} + +message IteratorErrorRequest { + uint64 id = 1; +} + +message IteratorErrorResponse { + Error err = 1; +} + +message IteratorReleaseRequest { + uint64 id = 1; +} + +message IteratorReleaseResponse { + Error err = 1; +} + +message HealthCheckResponse { + bytes details = 1; +} diff --git a/rpc/proto/sync/sync.proto b/rpc/proto/sync/sync.proto new file mode 100644 index 000000000..e1c1ccd22 --- /dev/null +++ b/rpc/proto/sync/sync.proto @@ -0,0 +1,166 @@ +syntax = "proto3"; + +package sync; + +import "google/protobuf/empty.proto"; + +option go_package = "github.com/ava-labs/avalanchego/proto/pb/sync"; + +// Request represents a request for information during syncing. +message Request { + oneof message { + SyncGetRangeProofRequest range_proof_request = 1; + SyncGetChangeProofRequest change_proof_request = 2; + } +} + +// The interface required by an x/sync/SyncManager for syncing. +// Note this service definition only exists for use in tests. +// A database shouldn't expose this over the internet, as it +// allows for reading/writing to the database. +service DB { + rpc GetMerkleRoot(google.protobuf.Empty) returns (GetMerkleRootResponse); + + rpc GetProof(GetProofRequest) returns (GetProofResponse); + + rpc GetChangeProof(GetChangeProofRequest) returns (GetChangeProofResponse); + rpc VerifyChangeProof(VerifyChangeProofRequest) returns (VerifyChangeProofResponse); + rpc CommitChangeProof(CommitChangeProofRequest) returns (google.protobuf.Empty); + + rpc GetRangeProof(GetRangeProofRequest) returns (GetRangeProofResponse); + rpc CommitRangeProof(CommitRangeProofRequest) returns (google.protobuf.Empty); +} + +message GetMerkleRootResponse { + bytes root_hash = 1; +} + +message GetProofRequest { + bytes key = 1; +} + +message GetProofResponse { + Proof proof = 1; +} + +message Proof { + bytes key = 1; + MaybeBytes value = 2; + repeated ProofNode proof = 3; +} + +// For use in sync client, which has a restriction on the size of +// the response. GetChangeProof in the DB service doesn't. +message SyncGetChangeProofRequest { + bytes start_root_hash = 1; + bytes end_root_hash = 2; + MaybeBytes start_key = 3; + MaybeBytes end_key = 4; + uint32 key_limit = 5; + uint32 bytes_limit = 6; +} + +message SyncGetChangeProofResponse { + oneof response { + ChangeProof change_proof = 1; + RangeProof range_proof = 2; + } +} + +message GetChangeProofRequest { + bytes start_root_hash = 1; + bytes end_root_hash = 2; + MaybeBytes start_key = 3; + MaybeBytes end_key = 4; + uint32 key_limit = 5; +} + +message GetChangeProofResponse { + oneof response { + ChangeProof change_proof = 1; + // True iff server errored with merkledb.ErrInsufficientHistory. + bool root_not_present = 2; + } +} + +message VerifyChangeProofRequest { + ChangeProof proof = 1; + MaybeBytes start_key = 2; + MaybeBytes end_key = 3; + bytes expected_root_hash = 4; +} + +message VerifyChangeProofResponse { + // If empty, there was no error. + string error = 1; +} + +message CommitChangeProofRequest { + ChangeProof proof = 1; +} + +// For use in sync client, which has a restriction on the size of +// the response. GetRangeProof in the DB service doesn't. +message SyncGetRangeProofRequest { + bytes root_hash = 1; + MaybeBytes start_key = 2; + MaybeBytes end_key = 3; + uint32 key_limit = 4; + uint32 bytes_limit = 5; +} + +message GetRangeProofRequest { + bytes root_hash = 1; + MaybeBytes start_key = 2; + MaybeBytes end_key = 3; + uint32 key_limit = 4; +} + +message GetRangeProofResponse { + RangeProof proof = 1; +} + +message CommitRangeProofRequest { + MaybeBytes start_key = 1; + RangeProof range_proof = 2; +} + +message ChangeProof { + repeated ProofNode start_proof = 1; + repeated ProofNode end_proof = 2; + repeated KeyChange key_changes = 3; +} + +message RangeProof { + repeated ProofNode start = 1; + repeated ProofNode end = 2; + repeated KeyValue key_values = 3; +} + +message ProofNode { + SerializedPath key = 1; + MaybeBytes value_or_hash = 2; + map children = 3; +} + +message KeyChange { + bytes key = 1; + MaybeBytes value = 2; +} + +message SerializedPath { + uint64 nibble_length = 1; + bytes value = 2; +} + +message MaybeBytes { + bytes value = 1; + // If false, this is None. + // Otherwise this is Some. + bool is_nothing = 2; +} + +message KeyValue { + bytes key = 1; + bytes value = 2; +} diff --git a/rpc/src/bin/client.rs b/rpc/src/bin/client.rs new file mode 100644 index 000000000..12b90dcd5 --- /dev/null +++ b/rpc/src/bin/client.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello from {}", file!()); +} diff --git a/rpc/src/bin/server.rs b/rpc/src/bin/server.rs new file mode 100644 index 000000000..2fc8473d8 --- /dev/null +++ b/rpc/src/bin/server.rs @@ -0,0 +1,258 @@ +use firewood::v2::{ + api::{BatchOp, Db, DbView, Error as DbError, Proposal}, + emptydb::{EmptyDb, HistoricalImpl}, +}; +use rpc::rpcdb::{ + database_server::{self, DatabaseServer}, + CloseRequest, CloseResponse, CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, + GetRequest, GetResponse, HasRequest, HasResponse, HealthCheckResponse, IteratorErrorRequest, + IteratorErrorResponse, IteratorNextRequest, IteratorNextResponse, IteratorReleaseRequest, + IteratorReleaseResponse, NewIteratorWithStartAndPrefixRequest, + NewIteratorWithStartAndPrefixResponse, PutRequest, PutResponse, WriteBatchRequest, + WriteBatchResponse, +}; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; +use tokio::sync::Mutex; +use tonic::{async_trait, transport::Server, Request, Response, Status}; + +struct DatabaseService { + db: EmptyDb, + iterators: Arc>, +} + +impl DatabaseService { + fn new() -> Self { + let db = EmptyDb; + let iterators = Arc::new(Mutex::new(Iterators::default())); + + Self { db, iterators } + } + + async fn revision(&self) -> Result, DbError> { + let root_hash = self.db.root_hash().await?; + self.db.revision(root_hash).await + } +} + +// TODO: implement Iterator +struct Iter; + +#[derive(Default)] +struct Iterators { + map: HashMap, + next_id: AtomicU64, +} + +impl Iterators { + fn insert(&mut self, iter: Iter) -> u64 { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + self.map.insert(id, iter); + id + } + + fn _get(&self, id: u64) -> Option<&Iter> { + self.map.get(&id) + } + + fn remove(&mut self, id: u64) { + self.map.remove(&id); + } +} + +#[async_trait] +impl database_server::Database for DatabaseService { + async fn has(&self, request: Request) -> Result, Status> { + let key = request.into_inner().key; + let revision = self.revision().await.into_status_result()?; + + let val = revision.val(key).await.into_status_result()?; + + let response = HasResponse { + has: val.is_some(), + ..Default::default() + }; + + Ok(Response::new(response)) + } + + async fn get(&self, request: Request) -> Result, Status> { + let key = request.into_inner().key; + let revision = self.revision().await.into_status_result()?; + + let value = revision + .val(key) + .await + .into_status_result()? + .map(|v| v.to_vec()); + + let Some(value) = value else { + return Err(Status::not_found("key not found")); + }; + + let response = GetResponse { + value, + ..Default::default() + }; + + Ok(Response::new(response)) + } + + async fn put(&self, request: Request) -> Result, Status> { + let PutRequest { key, value } = request.into_inner(); + let batch = BatchOp::Put { key, value }; + let proposal = Arc::new(self.db.propose(vec![batch]).await.into_status_result()?); + let _ = proposal.commit().await.into_status_result()?; + + Ok(Response::new(PutResponse::default())) + } + + async fn delete( + &self, + request: Request, + ) -> Result, Status> { + let DeleteRequest { key } = request.into_inner(); + let batch = BatchOp::<_, Vec>::Delete { key }; + let propoal = Arc::new(self.db.propose(vec![batch]).await.into_status_result()?); + let _ = propoal.commit().await.into_status_result()?; + + Ok(Response::new(DeleteResponse::default())) + } + + async fn compact( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("compact not implemented")) + } + + async fn close( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("close not implemented")) + } + + async fn health_check( + &self, + _request: Request<()>, + ) -> Result, Status> { + // TODO: why is the response a Vec? + Ok(Response::new(HealthCheckResponse::default())) + } + + async fn write_batch( + &self, + request: Request, + ) -> Result, Status> { + let WriteBatchRequest { puts, deletes } = request.into_inner(); + let batch = puts + .into_iter() + .map(from_put_request) + .chain(deletes.into_iter().map(from_delete_request)) + .collect(); + let proposal = Arc::new(self.db.propose(batch).await.into_status_result()?); + let _ = proposal.commit().await.into_status_result()?; + + Ok(Response::new(WriteBatchResponse::default())) + } + + async fn new_iterator_with_start_and_prefix( + &self, + request: Request, + ) -> Result, Status> { + let NewIteratorWithStartAndPrefixRequest { + start: _, + prefix: _, + } = request.into_inner(); + + // TODO: create the actual iterator + let id = { + let mut iters = self.iterators.lock().await; + iters.insert(Iter) + }; + + Ok(Response::new(NewIteratorWithStartAndPrefixResponse { id })) + } + + async fn iterator_next( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("iterator_next not implemented")) + } + + async fn iterator_error( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("iterator_error not implemented")) + } + + async fn iterator_release( + &self, + request: Request, + ) -> Result, Status> { + let IteratorReleaseRequest { id } = request.into_inner(); + + { + let mut iters = self.iterators.lock().await; + iters.remove(id); + } + + Ok(Response::new(IteratorReleaseResponse::default())) + } +} + +trait IntoStatusResult { + fn into_status_result(self) -> Result; +} + +impl IntoStatusResult for Result { + fn into_status_result(self) -> Result { + self.map_err(|err| match err { + DbError::HashNotFound { provided: _ } => todo!(), + DbError::IncorrectRootHash { + provided: _, + current: _, + } => todo!(), + DbError::IO(_) => todo!(), + DbError::InvalidProposal => todo!(), + _ => todo!(), + }) + } +} + +fn from_put_request(request: PutRequest) -> BatchOp, Vec> { + BatchOp::Put { + key: request.key, + value: request.value, + } +} + +fn from_delete_request(request: DeleteRequest) -> BatchOp, Vec> { + BatchOp::Delete { key: request.key } +} + +// TODO: use clap to parse command line input to run the server +#[tokio::main] +async fn main() { + let addr = "[::1]:10000".parse().unwrap(); + + println!("Database-Server listening on: {}", addr); + + let svc = DatabaseService::new(); + let svc = DatabaseServer::new(svc); + + // TODO: graceful shutdown + Server::builder() + .add_service(svc) + .serve(addr) + .await + .unwrap(); +} diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs new file mode 100644 index 000000000..4fe46ff36 --- /dev/null +++ b/rpc/src/lib.rs @@ -0,0 +1,7 @@ +pub mod sync { + tonic::include_proto!("sync"); +} + +pub mod rpcdb { + tonic::include_proto!("rpcdb"); +} diff --git a/rpc/src/main.rs b/rpc/src/main.rs deleted file mode 100644 index e7a11a969..000000000 --- a/rpc/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -}