Skip to content
This repository has been archived by the owner on Jun 8, 2022. It is now read-only.

Commit

Permalink
Refactor/sealer accepts just what it needs (#55)
Browse files Browse the repository at this point in the history
* refactor(store): move store out of sealer thread

* refactor(threading): move all mutable state into scheduler thread

* refactor(naming): prefix task with consumer name

* refactor(channels): remove channels from SectorMetadataManager

* refactor(extract): move metadata manager into separate file

* refactor(dig): use impl AsRef<Path> as per PR feedback

* refactor(dig): move SectorBuilderState construction into method

* refactor(dig): import at top level

* fix(rebase): change constant names to reflect new API
  • Loading branch information
laser authored Sep 18, 2019
1 parent 9b238e2 commit be273a9
Show file tree
Hide file tree
Showing 15 changed files with 769 additions and 692 deletions.
164 changes: 80 additions & 84 deletions sector-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,98 +3,111 @@ use std::path::Path;
use std::sync::{mpsc, Arc, Mutex};

use filecoin_proofs::error::ExpectWithBacktrace;
use filecoin_proofs::types::{PaddedBytesAmount, PoRepConfig, PoStConfig, SectorClass};
use filecoin_proofs::types::{PoRepConfig, PoStConfig, SectorClass};
use storage_proofs::sector::SectorId;

use crate::constants::*;
use crate::disk_backed_storage::new_sector_store;
use crate::error::{Result, SectorBuilderErr};
use crate::helpers;
use crate::helpers::SnapshotKey;
use crate::kv_store::{KeyValueStore, SledKvs};
use crate::metadata::*;
use crate::scheduler::{PerformHealthCheck, Request, Scheduler};
use crate::sealer::*;
use crate::metadata_manager::SectorMetadataManager;
use crate::scheduler::{PerformHealthCheck, Scheduler, SchedulerTask};
use crate::state::SectorBuilderState;
use crate::worker::*;
use crate::SectorStore;

const FATAL_NOLOAD: &str = "could not load snapshot";

pub struct SectorBuilder {
// Prevents FFI consumers from queueing behind long-running seal operations.
sealers_tx: mpsc::Sender<SealerInput>,
worker_tx: mpsc::Sender<WorkerTask>,

// For additional seal concurrency, add more workers here.
sealers: Vec<SealerWorker>,
workers: Vec<Worker>,

// The main worker's queue.
scheduler_tx: mpsc::SyncSender<Request>,
scheduler_tx: mpsc::SyncSender<SchedulerTask>,

// The main worker. Owns all mutable state for the SectorBuilder.
scheduler: Scheduler,

// Configures size of proofs and sectors managed by the SectorBuilder.
sector_class: SectorClass,
}

impl SectorBuilder {
// Initialize and return a SectorBuilder from metadata persisted to disk if
// it exists. Otherwise, initialize and return a fresh SectorBuilder. The
// metadata key is equal to the prover_id.
pub fn init_from_metadata<S: Into<String>>(
pub fn init_from_metadata(
sector_class: SectorClass,
last_committed_sector_id: SectorId,
metadata_dir: S,
metadata_dir: impl AsRef<Path>,
prover_id: [u8; 31],
sealed_sector_dir: S,
staged_sector_dir: S,
sealed_sector_dir: impl AsRef<Path>,
staged_sector_dir: impl AsRef<Path>,
max_num_staged_sectors: u8,
) -> Result<SectorBuilder> {
ensure_parameter_cache_hydrated(sector_class)?;

let kv_store = Arc::new(WrappedKeyValueStore {
inner: Box::new(SledKvs::initialize(metadata_dir.into())?),
});

// Initialize a SectorStore and wrap it in an Arc so we can access it
// from multiple threads. Our implementation assumes that the
// SectorStore is safe for concurrent access.
let sector_store = Arc::new(new_sector_store(
sector_class,
sealed_sector_dir.into(),
staged_sector_dir.into(),
));

// Configure the main worker's rendezvous channel.
let (main_tx, main_rx) = mpsc::sync_channel(0);
// Configure the scheduler's rendezvous channel.
let (scheduler_tx, scheduler_rx) = mpsc::sync_channel(0);

// Configure seal queue workers and channels.
let (seal_tx, seal_workers) = {
// Configure workers and channels.
let (worker_tx, workers) = {
let (tx, rx) = mpsc::channel();
let rx = Arc::new(Mutex::new(rx));

let workers = (0..NUM_SEAL_WORKERS)
.map(|n| SealerWorker::start(n, rx.clone(), sector_store.clone(), prover_id))
let workers = (0..NUM_WORKERS)
.map(|n| Worker::start(n, rx.clone(), prover_id))
.collect();

(tx, workers)
};

let SectorClass(sector_size, _) = sector_class;
let sector_size = sector_class.0.into();

// Initialize the key/value store in which we store metadata
// snapshots.
let kv_store = SledKvs::initialize(metadata_dir).expect("failed to initialize K/V store");

// Configure main worker.
let main_worker = Scheduler::start_with_metadata(
main_rx,
main_tx.clone(),
seal_tx.clone(),
kv_store.clone(),
sector_store.clone(),
last_committed_sector_id,
// Initialize a SectorStore and wrap it in an Arc so we can access it
// from multiple threads. Our implementation assumes that the
// SectorStore is safe for concurrent access.
let sector_store = new_sector_store(sector_class, sealed_sector_dir, staged_sector_dir);

// Build the scheduler's initial state. If available, we
// reconstitute this state from persisted metadata. If not, we
// create it from scratch.
let state = {
let loaded =
helpers::load_snapshot(&kv_store, &SnapshotKey::new(prover_id, sector_size))
.expects(FATAL_NOLOAD)
.map(Into::into);

loaded.unwrap_or_else(|| SectorBuilderState::new(last_committed_sector_id))
};

let max_user_bytes_per_staged_sector =
sector_store.sector_config().max_unsealed_bytes_per_sector();

let m = SectorMetadataManager {
kv_store,
sector_store,
state,
max_num_staged_sectors,
max_user_bytes_per_staged_sector,
prover_id,
PaddedBytesAmount::from(sector_size),
);
sector_size,
};

let scheduler = Scheduler::start(scheduler_tx.clone(), scheduler_rx, worker_tx.clone(), m);

Ok(SectorBuilder {
scheduler_tx: main_tx,
scheduler: main_worker,
sealers_tx: seal_tx,
sealers: seal_workers,
sector_class,
scheduler_tx,
scheduler,
worker_tx,
workers,
})
}

Expand All @@ -108,54 +121,57 @@ impl SectorBuilder {
store_until: SecondsSinceEpoch,
) -> Result<SectorId> {
log_unrecov(self.run_blocking(|tx| {
Request::AddPiece(piece_key, piece_bytes_amount, piece_path, store_until, tx)
SchedulerTask::AddPiece(piece_key, piece_bytes_amount, piece_path, store_until, tx)
}))
}

// Returns sealing status for the sector with specified id. If no sealed or
// staged sector exists with the provided id, produce an error.
pub fn get_seal_status(&self, sector_id: SectorId) -> Result<SealStatus> {
log_unrecov(self.run_blocking(|tx| Request::GetSealStatus(sector_id, tx)))
log_unrecov(self.run_blocking(|tx| SchedulerTask::GetSealStatus(sector_id, tx)))
}

// Unseals the sector containing the referenced piece and returns its
// bytes. Produces an error if this sector builder does not have a sealed
// sector containing the referenced piece.
pub fn read_piece_from_sealed_sector(&self, piece_key: String) -> Result<Vec<u8>> {
log_unrecov(self.run_blocking(|tx| Request::RetrievePiece(piece_key, tx)))
log_unrecov(self.run_blocking(|tx| SchedulerTask::RetrievePiece(piece_key, tx)))
}

// For demo purposes. Schedules sealing of all staged sectors.
pub fn seal_all_staged_sectors(&self) -> Result<()> {
log_unrecov(self.run_blocking(Request::SealAllStagedSectors))
log_unrecov(self.run_blocking(SchedulerTask::SealAllStagedSectors))
}

// Returns all sealed sector metadata.
pub fn get_sealed_sectors(&self, check_health: bool) -> Result<Vec<GetSealedSectorResult>> {
log_unrecov(
self.run_blocking(|tx| Request::GetSealedSectors(PerformHealthCheck(check_health), tx)),
)
log_unrecov(self.run_blocking(|tx| {
SchedulerTask::GetSealedSectors(PerformHealthCheck(check_health), tx)
}))
}

// Returns all staged sector metadata.
pub fn get_staged_sectors(&self) -> Result<Vec<StagedSectorMetadata>> {
log_unrecov(self.run_blocking(Request::GetStagedSectors))
log_unrecov(self.run_blocking(SchedulerTask::GetStagedSectors))
}

// Generates a proof-of-spacetime. Blocks the calling thread.
// Generates a proof-of-spacetime.
pub fn generate_post(
&self,
comm_rs: &[[u8; 32]],
challenge_seed: &[u8; 32],
faults: Vec<SectorId>,
) -> Result<Vec<u8>> {
log_unrecov(self.run_blocking(|tx| {
Request::GeneratePoSt(Vec::from(comm_rs), *challenge_seed, faults, tx)
SchedulerTask::GeneratePoSt(Vec::from(comm_rs), *challenge_seed, faults, tx)
}))
}

// Run a task, blocking on the return channel.
fn run_blocking<T, F: FnOnce(mpsc::SyncSender<T>) -> Request>(&self, with_sender: F) -> T {
fn run_blocking<T, F: FnOnce(mpsc::SyncSender<T>) -> SchedulerTask>(
&self,
with_sender: F,
) -> T {
let (tx, rx) = mpsc::sync_channel(0);

self.scheduler_tx
Expand All @@ -165,25 +181,20 @@ impl SectorBuilder {

rx.recv().expects(FATAL_NORECV_TASK)
}

// Return the SectorBuilder's configured sector class.
pub fn get_sector_class(&self) -> SectorClass {
self.sector_class
}
}

impl Drop for SectorBuilder {
fn drop(&mut self) {
// Shut down main worker and sealers, too.
let _ = self
.scheduler_tx
.send(Request::Shutdown)
.send(SchedulerTask::Shutdown)
.map_err(|err| println!("err sending Shutdown to scheduler: {:?}", err));

for _ in &mut self.sealers {
for _ in &mut self.workers {
let _ = self
.sealers_tx
.send(SealerInput::Shutdown)
.worker_tx
.send(WorkerTask::Shutdown)
.map_err(|err| println!("err sending Shutdown to sealer: {:?}", err));
}

Expand All @@ -196,7 +207,7 @@ impl Drop for SectorBuilder {
.map_err(|err| println!("err joining scheduler thread: {:?}", err));
}

for worker in &mut self.sealers {
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
let _ = thread
.join()
Expand All @@ -206,21 +217,6 @@ impl Drop for SectorBuilder {
}
}

pub struct WrappedKeyValueStore<T: KeyValueStore> {
inner: Box<T>,
}
impl<T: KeyValueStore> WrappedKeyValueStore<T> {
pub fn new(inner: T) -> Self {
Self {
inner: Box::new(inner),
}
}

pub fn inner(&self) -> &T {
&self.inner
}
}

/// Checks the parameter cache for the given sector size.
/// Returns an `Err` if it is not hydrated.
fn ensure_parameter_cache_hydrated(sector_class: SectorClass) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion sector-builder/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub const NUM_SEAL_WORKERS: usize = 2;
pub const NUM_WORKERS: usize = 2;

pub const FATAL_NOSEND_TASK: &str = "[run_blocking] could not send";
pub const FATAL_NORECV_TASK: &str = "[run_blocking] could not recv";
22 changes: 11 additions & 11 deletions sector-builder/src/disk_backed_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub enum SectorAccessProto {
}

pub struct DiskManager {
staging_path: String,
sealed_path: String,
staging_path: PathBuf,
sealed_path: PathBuf,

// A sector ID presentation with a defined protocol
sector_access_proto: SectorAccessProto,
Expand Down Expand Up @@ -350,15 +350,15 @@ impl SectorStore for ConcreteSectorStore {

pub fn new_sector_store(
sector_class: SectorClass,
sealed_path: String,
staging_path: String,
sealed_sector_dir: impl AsRef<Path>,
staged_sector_dir: impl AsRef<Path>,
) -> ConcreteSectorStore {
// By default, support on-000000000000-dddddddddd format
let default_access_proto = SectorAccessProto::Original(0);

let manager = Box::new(DiskManager {
staging_path,
sealed_path,
staging_path: staged_sector_dir.as_ref().to_owned(),
sealed_path: sealed_sector_dir.as_ref().to_owned(),
sector_access_proto: default_access_proto,
sector_segment_id: 0u32,
});
Expand Down Expand Up @@ -411,7 +411,7 @@ pub mod tests {
use std::fs::{create_dir_all, File};
use std::io::{Read, Write};

use filecoin_proofs::constants::{LIVE_SECTOR_SIZE, TEST_SECTOR_SIZE};
use filecoin_proofs::constants::{SECTOR_SIZE_256_MIB, SECTOR_SIZE_ONE_KIB};
use filecoin_proofs::fr32::FR32_PADDING_MAP;
use filecoin_proofs::types::{PoRepProofPartitions, SectorSize};

Expand Down Expand Up @@ -443,11 +443,11 @@ pub mod tests {
fn max_unsealed_bytes_per_sector_checks() {
let xs = vec![
(
SectorClass(SectorSize(LIVE_SECTOR_SIZE), PoRepProofPartitions(2)),
SectorClass(SectorSize(SECTOR_SIZE_256_MIB), PoRepProofPartitions(2)),
266338304,
),
(
SectorClass(SectorSize(TEST_SECTOR_SIZE), PoRepProofPartitions(2)),
SectorClass(SectorSize(SECTOR_SIZE_ONE_KIB), PoRepProofPartitions(2)),
1016,
),
];
Expand All @@ -462,7 +462,7 @@ pub mod tests {
#[test]
fn unsealed_sector_write_and_truncate() {
let storage = create_sector_store(SectorClass(
SectorSize(TEST_SECTOR_SIZE),
SectorSize(SECTOR_SIZE_ONE_KIB),
PoRepProofPartitions(2),
));
let mgr = storage.manager();
Expand Down Expand Up @@ -556,7 +556,7 @@ pub mod tests {
#[test]
fn deletes_staging_access() {
let store = create_sector_store(SectorClass(
SectorSize(TEST_SECTOR_SIZE),
SectorSize(SECTOR_SIZE_ONE_KIB),
PoRepProofPartitions(2),
));
let access = store
Expand Down
5 changes: 2 additions & 3 deletions sector-builder/src/helpers/add_piece.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fs::File;
use std::iter::Iterator;
use std::sync::Arc;

use filecoin_proofs::pieces::{
get_aligned_source, get_piece_alignment, sum_piece_bytes_with_alignment, PieceAlignment,
Expand All @@ -13,8 +12,8 @@ use crate::state::StagedState;
use crate::store::{SectorManager, SectorStore};
use storage_proofs::sector::SectorId;

pub fn add_piece(
sector_store: &Arc<impl SectorStore>,
pub fn add_piece<S: SectorStore>(
sector_store: &S,
mut staged_state: &mut StagedState,
piece_key: String,
piece_bytes_amount: u64,
Expand Down
Loading

0 comments on commit be273a9

Please sign in to comment.