Skip to content

Commit

Permalink
refactor: add KVPbCrudApi as abstraction layer for commonly used CRUD…
Browse files Browse the repository at this point in the history
… operations (#16479)

* chore: refactor rename_table()

* chore: refactor SchemaApi::create_index()

* chore: simplify SchemaApi::get_table_meta_history()

* refactor: add KVPbCrudApi as abstraction layer for commonly used CRUD operations

* chore: remove unused append_update_stream_meta_requests()

* chore: fixup lint
  • Loading branch information
drmingdrmer committed Sep 20, 2024
1 parent ed39081 commit 207ee9e
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 206 deletions.
44 changes: 44 additions & 0 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_meta_kvapi::kvapi::NonEmptyItem;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::Change;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::UpsertKV;
use databend_common_proto_conv::FromToProto;
use futures::future::FutureExt;
Expand Down Expand Up @@ -124,6 +125,22 @@ pub trait KVPbApi: KVApi {
}
}

/// Same as [`get_pb`](Self::get_pb)` but returns seq and value separately.
fn get_pb_seq_and_value<K>(
&self,
key: &K,
) -> impl Future<Output = Result<(u64, Option<K::ValueType>), Self::Error>> + Send
where
K: kvapi::Key + Send + Sync,
K::ValueType: FromToProto,
Self::Error: From<PbApiReadError<Self::Error>>,
{
async move {
let seq_v = self.get_pb(key).await?;
Ok((seq_v.seq(), seq_v.into_value()))
}
}

/// Get protobuf encoded value by kvapi::Key.
///
/// The key will be converted to string and the returned value is decoded by `FromToProto`.
Expand Down Expand Up @@ -161,6 +178,16 @@ pub trait KVPbApi: KVApi {
}
}

/// Get seq by [`kvapi::Key`].
fn get_seq<K>(&self, key: &K) -> impl Future<Output = Result<u64, Self::Error>> + Send
where K: kvapi::Key {
let key = key.to_string_key();
async move {
let raw_seqv = self.get_kv(&key).await?;
Ok(raw_seqv.seq())
}
}

/// Same as `get_pb_stream` but does not return keys, only values.
///
/// It guaranteed to return the same number of results as the input keys.
Expand Down Expand Up @@ -194,6 +221,23 @@ pub trait KVPbApi: KVApi {
})
}

/// Same as [`get_pb_stream`](Self::get_pb_stream) but collect the result in a `Vec` instead of a stream.
fn get_pb_vec<K, I>(
&self,
keys: I,
) -> impl Future<Output = Result<Vec<(K, Option<SeqV<K::ValueType>>)>, Self::Error>> + Send
where
K: kvapi::Key + Send + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K> + Send,
Self::Error: From<PbApiReadError<Self::Error>>,
{
async move {
let kvs = self.get_pb_stream(keys).await?.try_collect().await?;
Ok(kvs)
}
}

/// Get protobuf encoded values by a series of kvapi::Key.
///
/// The key will be converted to string and the returned value is decoded by `FromToProto`.
Expand Down
91 changes: 91 additions & 0 deletions src/meta/api/src/kv_pb_crud_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::Change;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::With;
use databend_common_proto_conv::FromToProto;
use fastrace::func_name;
use log::debug;

use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::meta_txn_error::MetaTxnError;
use crate::txn_backoff::txn_backoff;

/// [`KVPbCrudApi`] provide generic meta-service access pattern implementations for `name -> value` mapping.
///
/// `K` is the key type for name.
/// `K::ValueType` is the value type.
#[tonic::async_trait]
pub trait KVPbCrudApi<K>: KVApi<Error = MetaError>
where
K: kvapi::Key + Clone + Send + Sync + 'static,
K::ValueType: FromToProto + Clone + Send + Sync + 'static,
{
/// Update or insert a `name -> value` mapping.
///
/// The `update` function is called with the previous value and should output the updated to write back.
/// - Ok(Some(x)): write back `x`.
/// - Ok(None): cancel the update.
/// - Err(e): return error.
///
/// This function returns an embedded result,
/// - the outer result is for underlying kvapi error,
/// - the inner result is for business logic error.
async fn upsert_with<E>(
&self,
name_ident: &K,
update: impl Fn(Option<SeqV<K::ValueType>>) -> Result<Option<K::ValueType>, E> + Send,
) -> Result<Result<Change<K::ValueType>, E>, MetaTxnError> {
debug!(name_ident :? =name_ident; "KVPbCrudApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let seq_meta = self.get_pb(name_ident).await?;
let seq = seq_meta.seq();

let updated = match update(seq_meta.clone()) {
Ok(Some(x)) => x,
Ok(None) => return Ok(Ok(Change::new(seq_meta.clone(), seq_meta))),
Err(err) => return Ok(Err(err)),
};

let transition = self
.upsert_pb(
&UpsertPB::insert(name_ident.clone(), updated).with(MatchSeq::Exact(seq)),
)
.await?;

if transition.is_changed() {
return Ok(Ok(transition));
}
}
}
}

impl<K, T> KVPbCrudApi<K> for T
where
T: KVApi<Error = MetaError> + ?Sized,
K: kvapi::Key + Clone + Send + Sync + 'static,
K::ValueType: FromToProto + Clone + Send + Sync + 'static,
{
}
1 change: 1 addition & 0 deletions src/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod data_mask_api;
mod data_mask_api_impl;
pub mod kv_app_error;
pub mod kv_pb_api;
pub mod kv_pb_crud_api;
pub mod meta_txn_error;
pub mod name_id_value_api;
pub mod name_value_api;
Expand Down
36 changes: 0 additions & 36 deletions src/meta/api/src/name_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use databend_common_meta_app::tenant_key::ident::TIdent;
use databend_common_meta_app::tenant_key::resource::TenantResource;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KeyCodec;
use databend_common_meta_types::Change;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqValue;
Expand Down Expand Up @@ -98,41 +97,6 @@ where
Ok(Ok(()))
}

/// Update or insert a `name -> value` mapping.
///
/// The `update` function is called with the previous value and should output the updated to write back.
/// If it outputs `None`, nothing is written back.
async fn upsert_name_value_with(
&self,
name_ident: &TIdent<R, N>,
update: impl Fn(Option<R::ValueType>) -> Option<R::ValueType> + Send,
) -> Result<Change<R::ValueType>, MetaTxnError> {
debug!(name_ident :? =name_ident; "NameValueApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let seq_meta = self.get_pb(name_ident).await?;
let seq = seq_meta.seq();

let updated = match update(seq_meta.clone().into_value()) {
Some(x) => x,
None => return Ok(Change::new(seq_meta.clone(), seq_meta)),
};

let transition = self
.upsert_pb(
&UpsertPB::insert(name_ident.clone(), updated).with(MatchSeq::Exact(seq)),
)
.await?;

if transition.is_changed() {
return Ok(transition);
}
}
}

/// Update an existent `name -> value` mapping.
///
/// The `update` function is called with the previous value
Expand Down
Loading

0 comments on commit 207ee9e

Please sign in to comment.