Skip to content
This repository has been archived by the owner on Jun 8, 2022. It is now read-only.

Commit

Permalink
make sector access a relative path, allowing miners to move sector an…
Browse files Browse the repository at this point in the history
…d metadata directories (#16)

* feat(don't persist sector path, but rather access - allows moving sector builder dir)

* fixup(rust fmt)

* bug(update test, which really should be rewritten)

this test breaks the abstraction provided by the SectorManager by making assumptions about what sector access is (it assumes that it's a file path)

* bug(more test updates)

* bug(more test updates)

* bug(more test updates)

* bug(more test updates)

* bug(more test updates)

* refactor(refactor static method as free function as per @dig)
  • Loading branch information
laser authored Jul 31, 2019
1 parent eaaae29 commit 92beb5a
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 72 deletions.
44 changes: 30 additions & 14 deletions sector-builder-ffi/examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ extern crate scopeguard;

include!(concat!(env!("OUT_DIR"), "/libsector_builder_ffi.rs"));

use std::env;
use std::error::Error;
use std::io::Write;
use std::ptr;
Expand All @@ -17,6 +16,7 @@ use std::sync::atomic::AtomicPtr;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::{env, fs};

use byteorder::{LittleEndian, WriteBytesExt};
use ffi_toolkit::{c_str_to_rust_str, free_c_str, rust_str_to_c_str};
Expand Down Expand Up @@ -132,9 +132,12 @@ struct ConfigurableSizes {
}

unsafe fn sector_builder_lifecycle(use_live_store: bool) -> Result<(), Box<Error>> {
let metadata_dir = tempfile::tempdir().unwrap();
let staging_dir = tempfile::tempdir().unwrap();
let sealed_dir = tempfile::tempdir().unwrap();
let metadata_dir_a = tempfile::tempdir().unwrap();
let metadata_dir_b = tempfile::tempdir().unwrap();
let staging_dir_a = tempfile::tempdir().unwrap();
let staging_dir_b = tempfile::tempdir().unwrap();
let sealed_dir_a = tempfile::tempdir().unwrap();
let sealed_dir_b = tempfile::tempdir().unwrap();

let sizes = if use_live_store {
ConfigurableSizes {
Expand Down Expand Up @@ -165,9 +168,9 @@ unsafe fn sector_builder_lifecycle(use_live_store: bool) -> Result<(), Box<Error
};

let (sector_builder_a, max_bytes) = create_sector_builder(
&metadata_dir,
&staging_dir,
&sealed_dir,
&metadata_dir_a,
&staging_dir_a,
&sealed_dir_a,
u64_to_fr_safe(0),
123,
sizes.sector_class,
Expand Down Expand Up @@ -256,12 +259,25 @@ unsafe fn sector_builder_lifecycle(use_live_store: bool) -> Result<(), Box<Error
// drop the first sector builder, relinquishing any locks on persistence
sector_builder_ffi_destroy_sector_builder(sector_builder_a);

// create a new sector builder using same prover id, which should
// initialize with metadata persisted by previous sector builder
// migrate staged sectors, sealed sectors, and sector builder metadata to
// new directory (overwrites destination directory)
let renames = vec![
(metadata_dir_a.as_ref(), metadata_dir_b.as_ref()),
(staging_dir_a.as_ref(), staging_dir_b.as_ref()),
(sealed_dir_a.as_ref(), sealed_dir_b.as_ref()),
];

for (from, to) in renames {
fs::rename(from, to).expect(&format!("could not rename from {:?} to {:?}", from, to));
}

// create a new sector builder using the new staged sector dir and original
// prover id, which will initialize with metadata persisted by previous
// sector builder
let (sector_builder_b, _) = create_sector_builder(
&metadata_dir,
&staging_dir,
&sealed_dir,
&metadata_dir_b,
&staging_dir_b,
&sealed_dir_b,
u64_to_fr_safe(0),
123,
sizes.sector_class,
Expand Down Expand Up @@ -479,8 +495,8 @@ unsafe fn sector_builder_lifecycle(use_live_store: bool) -> Result<(), Box<Error
assert_eq!(format!("{:x?}", bytes_in), format!("{:x?}", bytes_out));
}

// verify that the comm_p of the fourth piece generated by sealing matches the comm_p generated
// directly with its bytes written to a piece file
// verify that the comm_p of the fourth piece generated by sealing matches
// the comm_p generated directly with its bytes written to a piece file
{
let resp = sector_builder_ffi_get_sealed_sectors(sector_builder_b);
defer!(sector_builder_ffi_destroy_get_sealed_sectors_response(resp));
Expand Down
62 changes: 34 additions & 28 deletions sector-builder/src/disk_backed_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fs::{create_dir_all, remove_file, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom};
use std::path::Path;
use std::path::{Path, PathBuf};

use filecoin_proofs::fr32::{
almost_truncate_to_unpadded_bytes, target_unpadded_bytes, write_padded,
Expand All @@ -11,17 +11,27 @@ use crate::error::SectorManagerErr;
use crate::store::{ProofsConfig, SectorConfig, SectorManager, SectorStore};
use crate::util;

// These sizes are for SEALED sectors. They are used to calculate the values of setup parameters.
// They can be overridden by setting the corresponding environment variable (with FILECOIN_PROOFS_ prefix),
// but this is not recommended, since some sealed sector sizes are invalid. If you must set this manually,
// ensure the chosen sector size is a multiple of 32.

pub struct DiskManager {
staging_path: String,
sealed_path: String,
}

fn sector_path<P: AsRef<Path>>(sector_dir: P, access: &str) -> PathBuf {
let mut file_path = PathBuf::from(sector_dir.as_ref());
file_path.push(access);

file_path
}

impl SectorManager for DiskManager {
fn sealed_sector_path(&self, access: &str) -> PathBuf {
sector_path(&self.sealed_path, access)
}

fn staged_sector_path(&self, access: &str) -> PathBuf {
sector_path(&self.staging_path, access)
}

fn new_sealed_sector_access(&self) -> Result<String, SectorManagerErr> {
self.new_sector_access(Path::new(&self.sealed_path))
}
Expand All @@ -33,7 +43,7 @@ impl SectorManager for DiskManager {
fn num_unsealed_bytes(&self, access: &str) -> Result<u64, SectorManagerErr> {
OpenOptions::new()
.read(true)
.open(access)
.open(self.staged_sector_path(access))
.map_err(|err| SectorManagerErr::CallerError(format!("{:?}", err)))
.map(|mut f| {
target_unpadded_bytes(&mut f)
Expand All @@ -44,7 +54,10 @@ impl SectorManager for DiskManager {

fn truncate_unsealed(&self, access: &str, size: u64) -> Result<(), SectorManagerErr> {
// I couldn't wrap my head around all ths result mapping, so here it is all laid out.
match OpenOptions::new().write(true).open(&access) {
match OpenOptions::new()
.write(true)
.open(self.staged_sector_path(access))
{
Ok(mut file) => match almost_truncate_to_unpadded_bytes(&mut file, size) {
Ok(padded_size) => match file.set_len(padded_size as u64) {
Ok(_) => Ok(()),
Expand All @@ -65,7 +78,7 @@ impl SectorManager for DiskManager {
OpenOptions::new()
.read(true)
.write(true)
.open(access)
.open(self.staged_sector_path(access))
.map_err(|err| SectorManagerErr::CallerError(format!("{:?}", err)))
.and_then(|mut file| {
write_padded(data, &mut file)
Expand All @@ -75,7 +88,8 @@ impl SectorManager for DiskManager {
}

fn delete_staging_sector_access(&self, access: &str) -> Result<(), SectorManagerErr> {
remove_file(access).map_err(|err| SectorManagerErr::CallerError(format!("{:?}", err)))
remove_file(self.staged_sector_path(access))
.map_err(|err| SectorManagerErr::CallerError(format!("{:?}", err)))
}

fn read_raw(
Expand All @@ -86,7 +100,7 @@ impl SectorManager for DiskManager {
) -> Result<Vec<u8>, SectorManagerErr> {
OpenOptions::new()
.read(true)
.open(access)
.open(self.staged_sector_path(access))
.map_err(|err| SectorManagerErr::CallerError(format!("{:?}", err)))
.and_then(|mut file| -> Result<Vec<u8>, SectorManagerErr> {
file.seek(SeekFrom::Start(start_offset))
Expand All @@ -104,25 +118,17 @@ impl SectorManager for DiskManager {

impl DiskManager {
fn new_sector_access(&self, root: &Path) -> Result<String, SectorManagerErr> {
let pbuf = root.join(util::rand_alpha_string(32));
let access = util::rand_alpha_string(32);
let file_path = root.join(&access);

create_dir_all(root)
.map_err(|err| SectorManagerErr::ReceiverError(format!("{:?}", err)))
.and_then(|_| {
File::create(&pbuf)
File::create(&file_path)
.map(|_| 0)
.map_err(|err| SectorManagerErr::ReceiverError(format!("{:?}", err)))
})
.and_then(|_| {
pbuf.to_str().map_or_else(
|| {
Err(SectorManagerErr::ReceiverError(
"could not create pbuf".to_string(),
))
},
|str_ref| Ok(str_ref.to_owned()),
)
})
.map(|_| access)
}
}

Expand Down Expand Up @@ -229,8 +235,8 @@ pub mod tests {
)
}

fn read_all_bytes(access: &str) -> Vec<u8> {
let mut file = File::open(access).unwrap();
fn read_all_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
let mut file = File::open(path.as_ref()).unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).unwrap();

Expand Down Expand Up @@ -298,7 +304,7 @@ pub mod tests {
.expect("failed to write");

// buffer the file's bytes into memory after writing bytes
let buf = read_all_bytes(&access);
let buf = read_all_bytes(mgr.staged_sector_path(&access));
let output_bytes_written = buf.len();

// ensure that we reported the correct number of written bytes
Expand All @@ -309,7 +315,7 @@ pub mod tests {
assert_eq!(8u8, buf[32]);

// read the file into memory again - this time after we truncate
let buf = read_all_bytes(&access);
let buf = read_all_bytes(mgr.staged_sector_path(&access));

// ensure the file we wrote to contains the expected bytes
assert_eq!(504, buf.len());
Expand All @@ -335,7 +341,7 @@ pub mod tests {
.expect("failed to truncate");

// read the file into memory again - this time after we truncate
let buf = read_all_bytes(&access);
let buf = read_all_bytes(mgr.staged_sector_path(&access));

// All but last bytes are identical.
assert_eq!(contents[0..num_bytes], buf[0..num_bytes]);
Expand Down
20 changes: 11 additions & 9 deletions sector-builder/src/helpers/retrieve_piece.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::path::PathBuf;
use std::sync::Arc;

use filecoin_proofs::get_unsealed_range;
Expand Down Expand Up @@ -46,7 +45,7 @@ fn retrieve_piece_aux<'a>(
sealed_sector: &SealedSectorMetadata,
prover_id: &[u8; 31],
piece_key: &'a str,
staging_sector_access: &'a str,
staged_sector_access: &'a str,
) -> error::Result<(UnpaddedBytesAmount, Vec<u8>)> {
let piece = sealed_sector
.pieces
Expand All @@ -69,8 +68,12 @@ fn retrieve_piece_aux<'a>(

let num_bytes_unsealed = get_unsealed_range(
(*sector_store).proofs_config().porep_config(),
&PathBuf::from(sealed_sector.sector_access.clone()),
&PathBuf::from(staging_sector_access),
sector_store
.manager()
.sealed_sector_path(&sealed_sector.sector_access),
sector_store
.manager()
.staged_sector_path(staged_sector_access),
prover_id,
&sector_id_as_bytes(sealed_sector.sector_id)?,
get_piece_start_byte(&piece_lengths, piece.num_bytes),
Expand All @@ -87,11 +90,10 @@ fn retrieve_piece_aux<'a>(
return Err(err_unrecov(s).into());
}

let piece_bytes = sector_store.manager().read_raw(
&staging_sector_access.to_string(),
0,
num_bytes_unsealed,
)?;
let piece_bytes =
sector_store
.manager()
.read_raw(staged_sector_access, 0, num_bytes_unsealed)?;

Ok((num_bytes_unsealed, piece_bytes))
}
9 changes: 6 additions & 3 deletions sector-builder/src/helpers/seal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::path::PathBuf;
use std::sync::Arc;

use filecoin_proofs::types::UnpaddedBytesAmount;
Expand Down Expand Up @@ -36,8 +35,12 @@ pub fn seal(
piece_inclusion_proofs,
} = seal_internal(
(*sector_store).proofs_config().porep_config(),
&PathBuf::from(staged_sector.sector_access.clone()),
&PathBuf::from(sealed_sector_access.clone()),
sector_store
.manager()
.staged_sector_path(&staged_sector.sector_access),
sector_store
.manager()
.sealed_sector_path(&sealed_sector_access),
prover_id,
&sector_id_as_bytes(staged_sector.sector_id)?,
&piece_lengths,
Expand Down
13 changes: 9 additions & 4 deletions sector-builder/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<T: KeyValueStore, S: SectorStore> SectorMetadataManager<T, S> {
return_channel: mpsc::SyncSender<Result<GeneratePoStDynamicSectorsCountOutput>>,
) {
// reduce our sealed sector state-map to a mapping of comm_r to sealed
// sector access (AKA path to sealed sector file)
// sector access
let comm_r_to_sector_access: HashMap<[u8; 32], String> = self
.state
.sealed
Expand All @@ -179,10 +179,15 @@ impl<T: KeyValueStore, S: SectorStore> SectorMetadataManager<T, S> {

let mut input_parts: Vec<(Option<String>, [u8; 32])> = Default::default();

// eject from this loop with an error if we've been provided a comm_r
// which does not correspond to any sealed sector metadata
for comm_r in comm_rs {
input_parts.push((comm_r_to_sector_access.get(comm_r).cloned(), *comm_r))
let access = comm_r_to_sector_access.get(comm_r).and_then(|access| {
self.sector_store
.manager()
.sealed_sector_path(access)
.to_str()
.map(str::to_string)
});
input_parts.push((access, *comm_r));
}

let mut seed = [0; 32];
Expand Down
Loading

0 comments on commit 92beb5a

Please sign in to comment.