Skip to content

Commit

Permalink
Add button to pause plotting
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Mar 21, 2024
1 parent 13f07eb commit 0d688c4
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pallet-balances = { git = "https://github.com/subspace/polkadot-sdk", rev = "d6b
parity-scale-codec = "3.6.9"
parking_lot = "0.12.1"
relm4 = "0.7.0-rc.1"
relm4-icons = { version = "0.7.0-alpha.2", features = ["checkmark", "cross", "grid-filled", "menu-large", "processor", "puzzle-piece", "size-horizontally", "ssd", "wallet2", "warning"] }
relm4-icons = { version = "0.7.0-alpha.2", features = ["checkmark", "cross", "grid-filled", "menu-large", "pause", "processor", "puzzle-piece", "size-horizontally", "ssd", "wallet2", "warning"] }
relm4-components = { version = "0.7.0-rc.1", default-features = false }
reqwest = { version = "0.11.25", default-features = false, features = ["json", "rustls-tls"] }
sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "d6b500960579d73c43fc4ef550b703acfa61c4c8", default-features = false }
Expand Down
19 changes: 18 additions & 1 deletion src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod utils;
use crate::backend::config::{Config, ConfigError, RawConfig};
use crate::backend::farmer::maybe_node_client::MaybeNodeRpcClient;
use crate::backend::farmer::{
DiskFarm, Farmer, FarmerNotification, FarmerOptions, InitialFarmState,
DiskFarm, Farmer, FarmerAction, FarmerNotification, FarmerOptions, InitialFarmState,
};
use crate::backend::networking::{create_network, NetworkOptions};
use crate::backend::node::{
Expand Down Expand Up @@ -231,6 +231,8 @@ pub enum BackendNotification {
pub enum BackendAction {
/// Config was created or updated
NewConfig { raw_config: RawConfig },
/// Farmer action
Farmer(FarmerAction),
}

struct LoadedBackend {
Expand Down Expand Up @@ -299,6 +301,12 @@ pub async fn create(
// Try to load config and start again
continue 'load;
}
BackendAction::Farmer(farmer_action) => {
warn!(
?farmer_action,
"Farmer action is not expected before initialization, ignored"
);
}
}
}

Expand Down Expand Up @@ -550,6 +558,8 @@ async fn run(
})
});

let mut farmer_action_sender = farmer.action_sender();

// Order is important here, we want to destroy dependents first and only then corresponding
// dependencies to avoid unnecessary errors and warnings in logs
let networking_fut = networking_fut;
Expand All @@ -562,6 +572,7 @@ async fn run(
process_backend_actions(
&config_file_path,
backend_action_receiver,
&mut farmer_action_sender,
&mut notifications_sender,
)
.await
Expand Down Expand Up @@ -991,6 +1002,7 @@ async fn create_farmer(
async fn process_backend_actions(
config_file_path: &Path,
backend_action_receiver: &mut mpsc::Receiver<BackendAction>,
farmer_action_sender: &mut mpsc::Sender<FarmerAction>,
notifications_sender: &mut mpsc::Sender<BackendNotification>,
) {
while let Some(action) = backend_action_receiver.next().await {
Expand All @@ -1013,6 +1025,11 @@ async fn process_backend_actions(
error!(%error, "Failed to send config save result notification");
}
}
BackendAction::Farmer(farmer_action) => {
if let Err(error) = farmer_action_sender.send(farmer_action).await {
error!(%error, "Failed to forward farmer action");
}
}
}
}
}
Expand Down
133 changes: 106 additions & 27 deletions src/backend/farmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::PosTable;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use event_listener_primitives::HandlerId;
use futures::channel::oneshot;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt, TryStreamExt};
Expand All @@ -32,8 +32,8 @@ use subspace_farmer::utils::{
use subspace_farmer::NodeClient;
use subspace_farmer_components::plotting::PlottedSector;
use thread_priority::ThreadPriority;
use tokio::sync::{Barrier, Semaphore};
use tracing::{error, info, info_span, Instrument};
use tokio::sync::{watch, Barrier, Semaphore};
use tracing::{debug, error, info, info_span, Instrument};

/// Minimal cache percentage, there is no need in setting it higher
const CACHE_PERCENTAGE: NonZeroU8 = NonZeroU8::MIN;
Expand Down Expand Up @@ -64,20 +64,42 @@ pub enum FarmerNotification {
},
}

#[derive(Debug, Clone)]
pub enum FarmerAction {
/// Pause (or resume) plotting
PausePlotting(bool),
}

type Notifications = Handler<FarmerNotification>;

pub(super) struct Farmer {
farm_fut: BoxFuture<'static, anyhow::Result<()>>,
farmer_fut: BoxFuture<'static, anyhow::Result<()>>,
farmer_cache_worker_fut: BoxFuture<'static, ()>,
initial_farm_states: Vec<InitialFarmState>,
farm_during_initial_plotting: bool,
notifications: Arc<Notifications>,
action_sender: mpsc::Sender<FarmerAction>,
}

impl Farmer {
pub(super) async fn run(self) -> anyhow::Result<()> {
let Farmer {
farmer_fut,
farmer_cache_worker_fut,
initial_farm_states,
farm_during_initial_plotting: _,
notifications,
action_sender,
} = self;

// Explicitly drop unnecessary things, especially senders to make sure farmer can exit
// gracefully when `fn run()`'s future is dropped
drop(initial_farm_states);
drop(notifications);
drop(action_sender);

let farmer_cache_worker_fut = match run_future_in_dedicated_thread(
move || self.farmer_cache_worker_fut,
move || farmer_cache_worker_fut,
"farmer-cache-worker".to_string(),
) {
Ok(farmer_cache_worker_fut) => farmer_cache_worker_fut,
Expand All @@ -88,17 +110,15 @@ impl Farmer {
}
};

let farm_fut = match run_future_in_dedicated_thread(
move || self.farm_fut,
"farmer-farm".to_string(),
) {
Ok(farmer_cache_worker_fut) => farmer_cache_worker_fut,
Err(error) => {
return Err(anyhow::anyhow!(
"Failed to spawn farm future in background thread: {error}"
));
}
};
let farm_fut =
match run_future_in_dedicated_thread(move || farmer_fut, "farmer-farmer".to_string()) {
Ok(farmer_cache_worker_fut) => farmer_cache_worker_fut,
Err(error) => {
return Err(anyhow::anyhow!(
"Failed to spawn farm future in background thread: {error}"
));
}
};

select! {
_ = farmer_cache_worker_fut.fuse() => {
Expand All @@ -120,6 +140,10 @@ impl Farmer {
self.farm_during_initial_plotting
}

pub(super) fn action_sender(&self) -> mpsc::Sender<FarmerAction> {
self.action_sender.clone()
}

pub(super) fn on_notification(&self, callback: HandlerFn<FarmerNotification>) -> HandlerId {
self.notifications.add(callback)
}
Expand Down Expand Up @@ -243,6 +267,8 @@ pub(super) async fn create_farmer(farmer_options: FarmerOptions) -> anyhow::Resu
}
}

let plotting_thread_pools_count = plotting_thread_pool_core_indices.len();

let downloading_semaphore =
Arc::new(Semaphore::new(plotting_thread_pool_core_indices.len() + 1));

Expand Down Expand Up @@ -567,28 +593,81 @@ pub(super) async fn create_farmer(farmer_options: FarmerOptions) -> anyhow::Resu
// event handlers
drop(plotted_pieces);

let farm_fut = Box::pin(
async move {
while let Some(result) = farms_stream.next().await {
match result {
Ok(id) => {
info!(%id, "Farm exited successfully");
}
Err(error) => {
return Err(error);
let (action_sender, mut action_receiver) = mpsc::channel(1);
let (pause_plotting_sender, mut pause_plotting_receiver) = watch::channel(false);

let pause_plotting_actions_fut = async move {
let mut thread_pools = Vec::with_capacity(plotting_thread_pools_count);

loop {
if *pause_plotting_receiver.borrow_and_update() {
// Collect all managers so that plotting will be effectively paused
if thread_pools.len() < plotting_thread_pools_count {
thread_pools.push(plotting_thread_pool_manager.get_thread_pools().await);
// Allow to un-pause plotting quickly if user requests it
continue;
}
} else {
// Returns all thread pools back to the manager
thread_pools.clear();
}

if pause_plotting_receiver.changed().await.is_err() {
break;
}
}
};

let process_actions_fut = async move {
while let Some(action) = action_receiver.next().await {
match action {
FarmerAction::PausePlotting(pause_plotting) => {
if let Err(error) = pause_plotting_sender.send(pause_plotting) {
debug!(%error, "Failed to forward pause plotting");
}
}
}
anyhow::Ok(())
}
anyhow::Ok(())
};

let farms_fut = async move {
while let Some(result) = farms_stream.next().await {
match result {
Ok(id) => {
info!(%id, "Farm exited successfully");
}
Err(error) => {
return Err(error);
}
}
}
anyhow::Ok(())
};

let farmer_fut = Box::pin(
async move {
select! {
_ = pause_plotting_actions_fut.fuse() => {
Ok(())
}
_ = process_actions_fut.fuse() => {
Ok(())
}
result = farms_fut.fuse() => {
result
}
}
}
.in_current_span(),
);

anyhow::Ok(Farmer {
farm_fut,
farmer_fut,
farmer_cache_worker_fut,
initial_farm_states,
farm_during_initial_plotting,
notifications,
action_sender,
})
}
45 changes: 39 additions & 6 deletions src/frontend/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ use relm4::prelude::*;
use relm4_icons::icon_name;
use subspace_core_primitives::BlockNumber;
use subspace_runtime_primitives::{Balance, SSC};
use tracing::debug;

#[derive(Debug)]
pub struct RunningInit {
pub plotting_paused: bool,
}

#[derive(Debug)]
pub enum RunningInput {
Expand All @@ -27,6 +33,12 @@ pub enum RunningInput {
NodeNotification(NodeNotification),
FarmerNotification(FarmerNotification),
ToggleFarmDetails,
TogglePausePlotting,
}

#[derive(Debug)]
pub enum RunningOutput {
PausePlotting(bool),
}

#[derive(Debug, Default)]
Expand All @@ -44,13 +56,14 @@ pub struct RunningView {
node_synced: bool,
farmer_state: FarmerState,
farms: FactoryHashMap<u8, FarmWidget>,
plotting_paused: bool,
}

#[relm4::component(pub)]
impl Component for RunningView {
type Init = ();
type Init = RunningInit;
type Input = RunningInput;
type Output = ();
type Output = RunningOutput;
type CommandOutput = ();

view! {
Expand Down Expand Up @@ -82,6 +95,13 @@ impl Component for RunningView {
set_icon_name: icon_name::GRID_FILLED,
set_tooltip: "Expand details about each farm",
},
gtk::ToggleButton {
connect_clicked => RunningInput::TogglePausePlotting,
set_active: model.plotting_paused,
set_has_frame: false,
set_icon_name: icon_name::PAUSE,
set_tooltip: "Pause plotting/replotting, note that currently encoding sectors will not be interrupted",
},
gtk::Box {
set_halign: gtk::Align::End,
set_hexpand: true,
Expand Down Expand Up @@ -163,7 +183,7 @@ impl Component for RunningView {
}

fn init(
_init: Self::Init,
init: Self::Init,
_root: Self::Root,
_sender: ComponentSender<Self>,
) -> ComponentParts<Self> {
Expand All @@ -177,6 +197,7 @@ impl Component for RunningView {
node_synced: false,
farmer_state: FarmerState::default(),
farms,
plotting_paused: init.plotting_paused,
};

let farms_box = model.farms.widget();
Expand All @@ -185,13 +206,13 @@ impl Component for RunningView {
ComponentParts { model, widgets }
}

fn update(&mut self, input: Self::Input, _sender: ComponentSender<Self>, _root: &Self::Root) {
self.process_input(input);
fn update(&mut self, input: Self::Input, sender: ComponentSender<Self>, _root: &Self::Root) {
self.process_input(input, sender);
}
}

impl RunningView {
fn process_input(&mut self, input: RunningInput) {
fn process_input(&mut self, input: RunningInput, sender: ComponentSender<Self>) {
match input {
RunningInput::Initialize {
best_block_number,
Expand All @@ -217,6 +238,7 @@ impl RunningView {
total_sectors: initial_farm_state.total_sectors_count,
plotted_total_sectors: initial_farm_state.plotted_sectors_count,
farm_during_initial_plotting,
plotting_paused: self.plotting_paused,
},
);
}
Expand Down Expand Up @@ -309,6 +331,17 @@ impl RunningView {
RunningInput::ToggleFarmDetails => {
self.farms.broadcast(FarmWidgetInput::ToggleFarmDetails);
}
RunningInput::TogglePausePlotting => {
self.plotting_paused = !self.plotting_paused;
self.farms
.broadcast(FarmWidgetInput::PausePlotting(self.plotting_paused));
if sender
.output(RunningOutput::PausePlotting(self.plotting_paused))
.is_err()
{
debug!("Failed to send RunningOutput::TogglePausePlotting");
}
}
}
}
}
Loading

0 comments on commit 0d688c4

Please sign in to comment.