Skip to content

Commit

Permalink
Add restatectl metadata get/patch subcommands
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Oct 2, 2024
1 parent 9cbccb4 commit d2c37f7
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 0 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions tools/restatectl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ restate-types = { workspace = true }
restate-wal-protocol = { workspace = true }

anyhow = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
chrono = { workspace = true }
clap = { version = "4.1", features = ["derive", "env", "wrap_help", "color"] }
clap-verbosity-flag = { version = "2.0.1" }
cling = { version = "0.1.0", default-features = false, features = ["derive"] }
crossterm = { version = "0.27.0" }
ctrlc = { version = "3.4" }
derive_more = { workspace = true }
futures-util = { workspace = true }
hyper-util = { workspace = true }
itertools = { workspace = true }
json-patch = "2.0.0"
prost-types = { workspace = true }
rlimit = { workspace = true }
serde = { workspace = true }
Expand Down
23 changes: 23 additions & 0 deletions tools/restatectl/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use restate_types::net::AdvertisedAddress;

use crate::commands::dump::Dump;
use crate::commands::log::Log;
use crate::commands::metadata::Metadata;
use crate::commands::node::Node;

#[derive(Run, Parser, Clone)]
Expand All @@ -28,6 +29,8 @@ pub struct CliApp {
pub common_opts: CommonOpts,
#[clap(flatten)]
pub connection: ConnectionInfo,
#[clap(flatten)]
pub metadata_store: MetadataStoreOpts,
#[clap(subcommand)]
pub cmd: Command,
}
Expand All @@ -39,6 +42,23 @@ pub struct ConnectionInfo {
pub cluster_controller: AdvertisedAddress,
}

#[derive(Parser, Collect, Debug, Clone)]
pub struct MetadataStoreOpts {
/// Metadata store host:port (e.g. http://localhost:5123/)
#[clap(long, value_hint= clap::ValueHint::Url, default_value_t = AdvertisedAddress::from_str("http://localhost:5123/").unwrap(), global = true)]
pub address: AdvertisedAddress,
#[clap(long, default_value_t = MetadataStoreType::Embedded, global = true)]
/// Metadata store type
pub store: MetadataStoreType,
}

#[derive(Parser, Collect, Debug, Clone, derive_more::Display, derive_more::FromStr)]
pub enum MetadataStoreType {
Local,
Embedded,
Etcd,
}

#[derive(Run, Subcommand, Clone)]
pub enum Command {
/// Dump various metadata from the cluster controller
Expand All @@ -50,6 +70,9 @@ pub enum Command {
/// Cluster node status
#[clap(subcommand)]
Nodes(Node),
/// Cluster metadata
#[clap(subcommand)]
Metadata(Metadata),
}

fn init(common_opts: &CommonOpts) {
Expand Down
78 changes: 78 additions & 0 deletions tools/restatectl/src/commands/metadata/get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::PathBuf;

use bytestring::ByteString;
use clap::Parser;
use cling::{Collect, Run};
use tracing::debug;

use restate_rocksdb::RocksDbManager;
use restate_types::config::Configuration;
use restate_types::live::Live;

use crate::commands::metadata::GenericMetadataValue;
use crate::environment::metadata_store;
use crate::environment::task_center::run_in_task_center;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[clap()]
#[cling(run = "get_value")]
pub struct GetValueOpts {
/// Restate configuration file
#[arg(
short,
long = "config-file",
env = "RESTATE_CONFIG",
value_name = "FILE"
)]
config_file: Option<PathBuf>,

/// The key to get
#[arg(short, long)]
key: String,
}

async fn get_value(opts: &GetValueOpts) -> anyhow::Result<()> {
let value = run_in_task_center(
opts.config_file.as_ref(),
|config, task_center| async move {
let rocksdb_manager =
RocksDbManager::init(Configuration::mapped_updateable(|c| &c.common));
debug!("RocksDB Initialized");

let metadata_store_client = metadata_store::start_metadata_store(
config.common.metadata_store_client.clone(),
Live::from_value(config.metadata_store.clone()).boxed(),
Live::from_value(config.metadata_store.clone())
.map(|c| &c.rocksdb)
.boxed(),
&task_center,
)
.await?;
debug!("Metadata store client created");

let value: Option<GenericMetadataValue> = metadata_store_client
.get(ByteString::from(opts.key.as_str()))
.await
.map_err(|e| anyhow::anyhow!("Failed to get value: {}", e))?;

rocksdb_manager.shutdown().await;
anyhow::Ok(value)
},
)
.await?;

let value = serde_json::to_string_pretty(&value).map_err(|e| anyhow::anyhow!(e))?;
println!("{}", value);

Ok(())
}
46 changes: 46 additions & 0 deletions tools/restatectl/src/commands/metadata/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod get;
mod patch;

use cling::prelude::*;
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};

#[derive(Run, Subcommand, Clone)]
pub enum Metadata {
/// Get a single key's value from the metadata store
Get(get::GetValueOpts),
/// Patch a value stored in the metadata store
Patch(patch::PatchValueOpts),
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GenericMetadataValue {
// We assume that the concrete serialized type's encoded version field is called "version".
version: Version,

#[serde(flatten)]
data: serde_json::Map<String, serde_json::Value>,
}

flexbuffers_storage_encode_decode!(GenericMetadataValue);

impl GenericMetadataValue {
pub fn to_json_value(&self) -> serde_json::Value {
serde_json::Value::Object(self.data.clone())
}
}

impl Versioned for GenericMetadataValue {
fn version(&self) -> Version {
self.version
}
}
138 changes: 138 additions & 0 deletions tools/restatectl/src/commands/metadata/patch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::PathBuf;

use bytestring::ByteString;
use clap::Parser;
use cling::{Collect, Run};
use tracing::debug;

use restate_core::metadata_store::Precondition;
use restate_rocksdb::RocksDbManager;
use restate_types::config::Configuration;
use restate_types::live::Live;
use restate_types::Version;

use crate::commands::metadata::GenericMetadataValue;
use crate::environment::metadata_store;
use crate::environment::task_center::run_in_task_center;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[clap()]
#[cling(run = "patch_value")]
pub struct PatchValueOpts {
/// Set a configuration file to use for Restate.
/// For more details, check the documentation.
#[arg(
short,
long = "config-file",
env = "RESTATE_CONFIG",
value_name = "FILE"
)]
config_file: Option<PathBuf>,

/// The key to patch
#[arg(short, long)]
key: String,

/// The JSON patch to apply to value
#[arg(short, long)]
patch: String,

/// Expected version for conditional update
#[arg(short = 'e', long)]
version: Option<u32>,

/// Preview the change without applying it
#[arg(short = 'n', long, default_value_t = false)]
dry_run: bool,
}

async fn patch_value(opts: &PatchValueOpts) -> anyhow::Result<()> {
let patch: json_patch::Patch = serde_json::from_str(opts.patch.as_str())
.map_err(|e| anyhow::anyhow!("Parsing JSON patch: {}", e))?;

let value = run_in_task_center(
opts.config_file.as_ref(),
|config, task_center| async move {
let rocksdb_manager =
RocksDbManager::init(Configuration::mapped_updateable(|c| &c.common));
debug!("RocksDB Initialized");

let metadata_store_client = metadata_store::start_metadata_store(
config.common.metadata_store_client.clone(),
Live::from_value(config.metadata_store.clone()).boxed(),
Live::from_value(config.metadata_store.clone())
.map(|c| &c.rocksdb)
.boxed(),
&task_center,
)
.await?;
debug!("Metadata store client created");

let value: GenericMetadataValue = metadata_store_client
.get(ByteString::from(opts.key.as_str()))
.await?
.ok_or(anyhow::anyhow!("Key not found: '{}'", opts.key))?;

if let Some(expected_version) = opts.version {
if value.version != Version::from(expected_version) {
anyhow::bail!(
"Version mismatch: expected v{}, got {:#} from store",
expected_version,
value.version
);
}
}

let mut document = value.to_json_value();
let result = match json_patch::patch(&mut document, &patch) {
Ok(_) => {
let new_value = GenericMetadataValue {
version: value.version.next(),
data: serde_json::from_value(document.clone())
.map_err(|e| anyhow::anyhow!(e))?,
};

if !opts.dry_run {
debug!(
"Updating metadata key '{}' with expected {:?} version to {:?}",
opts.key, value.version, new_value.version
);

metadata_store_client
.put(
ByteString::from(opts.key.as_str()),
&new_value,
Precondition::MatchesVersion(value.version),
)
.await
.map_err(|e| anyhow::anyhow!("Store update failed: {}", e))?
} else {
println!("Dry run - updated value:\n");
}

Ok(new_value)
}
Err(e) => Err(anyhow::anyhow!("Patch failed: {}", e)),
};

rocksdb_manager.shutdown().await;
result
},
)
.await?;

let value = serde_json::to_string_pretty(&value).map_err(|e| anyhow::anyhow!(e))?;
println!("{}", value);

Ok(())
}
1 change: 1 addition & 0 deletions tools/restatectl/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
mod display_util;
pub mod dump;
pub mod log;
pub mod metadata;
pub mod node;

0 comments on commit d2c37f7

Please sign in to comment.