Skip to content

Commit

Permalink
Save file and shard checksums
Browse files Browse the repository at this point in the history
Signed-off-by: Lee Smet <[email protected]>
  • Loading branch information
LeeSmet committed Dec 22, 2020
1 parent 0327552 commit a1a7f68
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 12 deletions.
74 changes: 69 additions & 5 deletions src/erasure.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use crate::meta::{Checksum, CHECKSUM_LENGTH};
use blake2::{
digest::{Update, VariableOutput},
VarBlake2b,
};
use log::trace;
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::convert::TryInto;

/// A data encoder is responsible for encoding original data into multiple shards, and decoding
/// multiple shards back to the original data, if sufficient shards are available.
Expand All @@ -9,6 +15,10 @@ pub struct Encoder {
parity_shards: usize,
}

/// A data shard resulting from encoding data
#[derive(Debug, Clone)]
pub struct Shard(Vec<u8>);

impl Encoder {
/// Create a new encoder. There can be at most 255 data shards.
///
Expand All @@ -33,7 +43,7 @@ impl Encoder {
/// Erasure encode data using ReedSolomon encoding over the galois 8 field. This returns the
/// shards created by the encoding. The order of the shards is important to later retrieve
/// the values
pub fn encode(&self, mut data: Vec<u8>) -> Vec<Vec<u8>> {
pub fn encode(&self, mut data: Vec<u8>) -> Vec<Shard> {
trace!("encoding data ({} bytes)", data.len());
// pkcs7 padding
let padding_len = self.data_shards - data.len() % self.data_shards;
Expand All @@ -42,16 +52,16 @@ impl Encoder {
// the constraints on the amount of data shards when a new encoder is created.
data.extend_from_slice(&vec![padding_len as u8; padding_len]);
// data shards
let mut shards: Vec<Vec<u8>> = data
let mut shards: Vec<Shard> = data
.chunks_exact(data.len() / self.data_shards) // we already padded so this is always exact
.map(Vec::from)
.map(|chunk| Shard::from(Vec::from(chunk)))
.collect();
// add parity shards
// NOTE: we don't need to do a float division with ceiling, since integer division
// essentially always rounds down, and we always add padding
trace!("preparing parity shards");
shards.extend(vec![
vec![0; data.len() / self.data_shards]; // data is padded so this is a perfect division
Shard::from(vec![0; data.len() / self.data_shards]); // data is padded so this is a perfect division
self.parity_shards
]);

Expand Down Expand Up @@ -103,6 +113,57 @@ impl Encoder {
}
}

impl std::ops::Deref for Shard {
type Target = Vec<u8>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<Vec<u8>> for Shard {
fn from(data: Vec<u8>) -> Self {
Shard(data)
}
}

impl AsRef<[u8]> for Shard {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

impl AsMut<[u8]> for Shard {
fn as_mut(&mut self) -> &mut [u8] {
&mut self.0
}
}

impl Shard {
/// Create a new shard from some data
pub fn new(data: Vec<u8>) -> Self {
Shard(data)
}

/// Generate a checksum for the data in the shard
pub fn checksum(&self) -> Checksum {
let mut hasher = VarBlake2b::new(CHECKSUM_LENGTH).unwrap();
hasher.update(&self.0);

// expect is safe due to the static size, which is known to be valid
hasher
.finalize_boxed()
.as_ref()
.try_into()
.expect("Invalid hash size returned")
}

/// Cosume the shard, returning the actual data
pub fn into_inner(self) -> Vec<u8> {
self.0
}
}

#[cfg(test)]
mod tests {
use super::Encoder;
Expand Down Expand Up @@ -132,7 +193,10 @@ mod tests {
}
shard_to_erase.shuffle(&mut rand::thread_rng());

let recovered: Vec<Option<Vec<u8>>> = shards.into_iter().map(Some).collect();
let recovered: Vec<Option<Vec<u8>>> = shards
.into_iter()
.map(|shard| Some(shard.into_inner()))
.collect();

let orig_res = encoder.decode(recovered);

Expand Down
39 changes: 33 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use blake2::{digest::VariableOutput, VarBlake2b};
use futures::future::{join_all, try_join_all};
use log::{debug, info, trace};
use std::convert::TryInto;
use std::fs::File;
use std::io::{Read, Write};
use structopt::StructOpt;
Expand All @@ -11,7 +13,7 @@ use zstor_v2::config::{Config, Meta};
use zstor_v2::encryption::{Encryptor, AESGCM};
use zstor_v2::erasure::Encoder;
use zstor_v2::etcd::Etcd;
use zstor_v2::meta::{MetaData, ShardInfo};
use zstor_v2::meta::{Checksum, MetaData, ShardInfo, CHECKSUM_LENGTH};
use zstor_v2::zdb::Zdb;

#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -104,6 +106,7 @@ fn main() -> Result<(), String> {
Cmd::Store { ref file } => {
// start by canonicalizing the path
let file = canonicalize_path(&file)?;
trace!("encoding file {:?}", file);
// make sure file is in root dir
if let Some(ref root) = cfg.virtual_root() {
if !file.starts_with(root) {
Expand All @@ -120,7 +123,9 @@ fn main() -> Result<(), String> {
{
return Err("only files can be stored".to_string());
}
trace!("encoding file {:?}", file);

let file_checksum = checksum(&file)?;
debug!("file checksum: {}", hex::encode(file_checksum));

// start reading file to encrypt
trace!("loading file data");
Expand All @@ -139,7 +144,7 @@ fn main() -> Result<(), String> {
let encrypted = encryptor.encrypt(&compressed)?;
trace!("encrypted size: {} bytes", encrypted.len());

let metadata = store_data(encrypted, &cfg).await?;
let metadata = store_data(encrypted, file_checksum, &cfg).await?;
cluster.save_meta(&file, &metadata).compat().await?;
}
Cmd::Retrieve { ref file } => {
Expand Down Expand Up @@ -167,7 +172,7 @@ fn main() -> Result<(), String> {
let metadata = cluster.load_meta(file).compat().await?;
let decoded = recover_data(&metadata).await?;

let metadata = store_data(decoded, &cfg).await?;
let metadata = store_data(decoded, metadata.checksum().clone(), &cfg).await?;
cluster.save_meta(&file, &metadata).compat().await?;
}
};
Expand Down Expand Up @@ -215,7 +220,7 @@ async fn recover_data(metadata: &MetaData) -> Result<Vec<u8>, String> {
Ok(decoded)
}

async fn store_data(data: Vec<u8>, cfg: &Config) -> Result<MetaData, String> {
async fn store_data(data: Vec<u8>, checksum: Checksum, cfg: &Config) -> Result<MetaData, String> {
let encoder = Encoder::new(cfg.data_shards(), cfg.parity_shards());
let shards = encoder.encode(data);
debug!("data encoded");
Expand All @@ -229,13 +234,19 @@ async fn store_data(data: Vec<u8>, cfg: &Config) -> Result<MetaData, String> {
handles.push(tokio::spawn(async move {
let mut db = Zdb::new(backend.clone()).await?;
let keys = db.set(&shard).await?;
Ok(ShardInfo::new(shard_idx, keys, backend.clone()))
Ok(ShardInfo::new(
shard_idx,
shard.checksum(),
keys,
backend.clone(),
))
}));
}

let mut metadata = MetaData::new(
cfg.data_shards(),
cfg.parity_shards(),
checksum,
cfg.encryption().clone(),
cfg.compression().clone(),
);
Expand Down Expand Up @@ -284,3 +295,19 @@ fn canonicalize_path(path: &std::path::PathBuf) -> Result<std::path::PathBuf, St
},
})
}

/// Get a 16 byte blake2b checksum of a file
fn checksum(file: &std::path::PathBuf) -> Result<Checksum, String> {
trace!("getting file checksum");
let mut file = File::open(file).map_err(|e| e.to_string())?;
// The unwrap here is safe since we know that 16 is a valid output size
let mut hasher = VarBlake2b::new(CHECKSUM_LENGTH).unwrap();
std::io::copy(&mut file, &mut hasher).map_err(|e| format!("failed to get file hash: {}", e))?;

// expect is safe due to the static size, which is known to be valid
Ok(hasher
.finalize_boxed()
.as_ref()
.try_into()
.expect("Invalid hash size returned"))
}
28 changes: 27 additions & 1 deletion src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ use crate::config::{Compression, Encryption};
use crate::zdb::{Key, ZdbConnectionInfo};
use serde::{Deserialize, Serialize};

/// The length of file and shard checksums
pub const CHECKSUM_LENGTH: usize = 16;
/// A checksum of a data object
pub type Checksum = [u8; CHECKSUM_LENGTH];

/// MetaData holds all information needed to retrieve, decode, decrypt and decompress shards back
/// to the original data.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand All @@ -11,6 +16,8 @@ pub struct MetaData {
/// The amount of redundant data shards which are generated when the data is encoded. Essentially,
/// this many shards can be lost while still being able to recover the original data.
parity_shards: usize,
/// Checksum of the full file
checksum: Checksum,
/// configuration to use for the encryption stage
encryption: Encryption,
/// configuration to use for the compression stage
Expand All @@ -23,6 +30,7 @@ pub struct MetaData {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ShardInfo {
shard_idx: usize,
checksum: Checksum,
keys: Vec<Key>,
#[serde(flatten)]
ci: ZdbConnectionInfo,
Expand All @@ -33,12 +41,14 @@ impl MetaData {
pub fn new(
data_shards: usize,
parity_shards: usize,
checksum: Checksum,
encryption: Encryption,
compression: Compression,
) -> Self {
Self {
data_shards,
parity_shards,
checksum,
encryption,
compression,
shards: Vec::with_capacity(data_shards + parity_shards),
Expand Down Expand Up @@ -75,14 +85,25 @@ impl MetaData {
pub fn shards(&self) -> &[ShardInfo] {
&self.shards
}

/// Return the checksum of the file
pub fn checksum(&self) -> &Checksum {
&self.checksum
}
}

impl ShardInfo {
/// Create a new shardinfo, from the connectioninfo for the zdb (namespace) and the actual key
/// in which the data is stored
pub fn new(shard_idx: usize, keys: Vec<Key>, ci: ZdbConnectionInfo) -> Self {
pub fn new(
shard_idx: usize,
checksum: Checksum,
keys: Vec<Key>,
ci: ZdbConnectionInfo,
) -> Self {
Self {
shard_idx,
checksum,
keys,
ci,
}
Expand All @@ -103,4 +124,9 @@ impl ShardInfo {
pub fn key(&self) -> &[Key] {
&self.keys
}

/// Get the checksum of this shard
pub fn checksum(&self) -> &Checksum {
&self.checksum
}
}
2 changes: 2 additions & 0 deletions src/test_etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fn main() {
let mut data = MetaData::new(
1,
2,
[0; 16],
Encryption::new(
"AES",
&SymmetricKey::new([
Expand All @@ -49,6 +50,7 @@ fn main() {
);
data.add_shard(ShardInfo::new(
0,
[0; 16],
vec![0],
ZdbConnectionInfo::new("[::1]:9900".parse().unwrap(), None, None),
));
Expand Down

0 comments on commit a1a7f68

Please sign in to comment.