From 0d688c415529f54ba9414471e63e7d8229e178fc Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 21 Mar 2024 23:21:45 +0200 Subject: [PATCH] Add button to pause plotting --- Cargo.toml | 2 +- src/backend.rs | 19 ++++- src/backend/farmer.rs | 133 ++++++++++++++++++++++++++++------- src/frontend/running.rs | 45 ++++++++++-- src/frontend/running/farm.rs | 75 ++++++++++++++------ src/main.rs | 32 ++++++++- 6 files changed, 247 insertions(+), 59 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 855afaa..c25a791 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,7 +62,7 @@ pallet-balances = { git = "https://github.com/subspace/polkadot-sdk", rev = "d6b parity-scale-codec = "3.6.9" parking_lot = "0.12.1" relm4 = "0.7.0-rc.1" -relm4-icons = { version = "0.7.0-alpha.2", features = ["checkmark", "cross", "grid-filled", "menu-large", "processor", "puzzle-piece", "size-horizontally", "ssd", "wallet2", "warning"] } +relm4-icons = { version = "0.7.0-alpha.2", features = ["checkmark", "cross", "grid-filled", "menu-large", "pause", "processor", "puzzle-piece", "size-horizontally", "ssd", "wallet2", "warning"] } relm4-components = { version = "0.7.0-rc.1", default-features = false } reqwest = { version = "0.11.25", default-features = false, features = ["json", "rustls-tls"] } sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "d6b500960579d73c43fc4ef550b703acfa61c4c8", default-features = false } diff --git a/src/backend.rs b/src/backend.rs index 7db4be5..9baaaf1 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -8,7 +8,7 @@ mod utils; use crate::backend::config::{Config, ConfigError, RawConfig}; use crate::backend::farmer::maybe_node_client::MaybeNodeRpcClient; use crate::backend::farmer::{ - DiskFarm, Farmer, FarmerNotification, FarmerOptions, InitialFarmState, + DiskFarm, Farmer, FarmerAction, FarmerNotification, FarmerOptions, InitialFarmState, }; use crate::backend::networking::{create_network, NetworkOptions}; use crate::backend::node::{ @@ -231,6 +231,8 @@ pub enum BackendNotification { pub enum BackendAction { /// Config was created or updated NewConfig { raw_config: RawConfig }, + /// Farmer action + Farmer(FarmerAction), } struct LoadedBackend { @@ -299,6 +301,12 @@ pub async fn create( // Try to load config and start again continue 'load; } + BackendAction::Farmer(farmer_action) => { + warn!( + ?farmer_action, + "Farmer action is not expected before initialization, ignored" + ); + } } } @@ -550,6 +558,8 @@ async fn run( }) }); + let mut farmer_action_sender = farmer.action_sender(); + // Order is important here, we want to destroy dependents first and only then corresponding // dependencies to avoid unnecessary errors and warnings in logs let networking_fut = networking_fut; @@ -562,6 +572,7 @@ async fn run( process_backend_actions( &config_file_path, backend_action_receiver, + &mut farmer_action_sender, &mut notifications_sender, ) .await @@ -991,6 +1002,7 @@ async fn create_farmer( async fn process_backend_actions( config_file_path: &Path, backend_action_receiver: &mut mpsc::Receiver, + farmer_action_sender: &mut mpsc::Sender, notifications_sender: &mut mpsc::Sender, ) { while let Some(action) = backend_action_receiver.next().await { @@ -1013,6 +1025,11 @@ async fn process_backend_actions( error!(%error, "Failed to send config save result notification"); } } + BackendAction::Farmer(farmer_action) => { + if let Err(error) = farmer_action_sender.send(farmer_action).await { + error!(%error, "Failed to forward farmer action"); + } + } } } } diff --git a/src/backend/farmer.rs b/src/backend/farmer.rs index cdd04a1..b7c015f 100644 --- a/src/backend/farmer.rs +++ b/src/backend/farmer.rs @@ -7,7 +7,7 @@ use crate::PosTable; use anyhow::anyhow; use async_lock::Mutex as AsyncMutex; use event_listener_primitives::HandlerId; -use futures::channel::oneshot; +use futures::channel::{mpsc, oneshot}; use futures::future::BoxFuture; use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{select, FutureExt, StreamExt, TryStreamExt}; @@ -32,8 +32,8 @@ use subspace_farmer::utils::{ use subspace_farmer::NodeClient; use subspace_farmer_components::plotting::PlottedSector; use thread_priority::ThreadPriority; -use tokio::sync::{Barrier, Semaphore}; -use tracing::{error, info, info_span, Instrument}; +use tokio::sync::{watch, Barrier, Semaphore}; +use tracing::{debug, error, info, info_span, Instrument}; /// Minimal cache percentage, there is no need in setting it higher const CACHE_PERCENTAGE: NonZeroU8 = NonZeroU8::MIN; @@ -64,20 +64,42 @@ pub enum FarmerNotification { }, } +#[derive(Debug, Clone)] +pub enum FarmerAction { + /// Pause (or resume) plotting + PausePlotting(bool), +} + type Notifications = Handler; pub(super) struct Farmer { - farm_fut: BoxFuture<'static, anyhow::Result<()>>, + farmer_fut: BoxFuture<'static, anyhow::Result<()>>, farmer_cache_worker_fut: BoxFuture<'static, ()>, initial_farm_states: Vec, farm_during_initial_plotting: bool, notifications: Arc, + action_sender: mpsc::Sender, } impl Farmer { pub(super) async fn run(self) -> anyhow::Result<()> { + let Farmer { + farmer_fut, + farmer_cache_worker_fut, + initial_farm_states, + farm_during_initial_plotting: _, + notifications, + action_sender, + } = self; + + // Explicitly drop unnecessary things, especially senders to make sure farmer can exit + // gracefully when `fn run()`'s future is dropped + drop(initial_farm_states); + drop(notifications); + drop(action_sender); + let farmer_cache_worker_fut = match run_future_in_dedicated_thread( - move || self.farmer_cache_worker_fut, + move || farmer_cache_worker_fut, "farmer-cache-worker".to_string(), ) { Ok(farmer_cache_worker_fut) => farmer_cache_worker_fut, @@ -88,17 +110,15 @@ impl Farmer { } }; - let farm_fut = match run_future_in_dedicated_thread( - move || self.farm_fut, - "farmer-farm".to_string(), - ) { - Ok(farmer_cache_worker_fut) => farmer_cache_worker_fut, - Err(error) => { - return Err(anyhow::anyhow!( - "Failed to spawn farm future in background thread: {error}" - )); - } - }; + let farm_fut = + match run_future_in_dedicated_thread(move || farmer_fut, "farmer-farmer".to_string()) { + Ok(farmer_cache_worker_fut) => farmer_cache_worker_fut, + Err(error) => { + return Err(anyhow::anyhow!( + "Failed to spawn farm future in background thread: {error}" + )); + } + }; select! { _ = farmer_cache_worker_fut.fuse() => { @@ -120,6 +140,10 @@ impl Farmer { self.farm_during_initial_plotting } + pub(super) fn action_sender(&self) -> mpsc::Sender { + self.action_sender.clone() + } + pub(super) fn on_notification(&self, callback: HandlerFn) -> HandlerId { self.notifications.add(callback) } @@ -243,6 +267,8 @@ pub(super) async fn create_farmer(farmer_options: FarmerOptions) -> anyhow::Resu } } + let plotting_thread_pools_count = plotting_thread_pool_core_indices.len(); + let downloading_semaphore = Arc::new(Semaphore::new(plotting_thread_pool_core_indices.len() + 1)); @@ -567,28 +593,81 @@ pub(super) async fn create_farmer(farmer_options: FarmerOptions) -> anyhow::Resu // event handlers drop(plotted_pieces); - let farm_fut = Box::pin( - async move { - while let Some(result) = farms_stream.next().await { - match result { - Ok(id) => { - info!(%id, "Farm exited successfully"); - } - Err(error) => { - return Err(error); + let (action_sender, mut action_receiver) = mpsc::channel(1); + let (pause_plotting_sender, mut pause_plotting_receiver) = watch::channel(false); + + let pause_plotting_actions_fut = async move { + let mut thread_pools = Vec::with_capacity(plotting_thread_pools_count); + + loop { + if *pause_plotting_receiver.borrow_and_update() { + // Collect all managers so that plotting will be effectively paused + if thread_pools.len() < plotting_thread_pools_count { + thread_pools.push(plotting_thread_pool_manager.get_thread_pools().await); + // Allow to un-pause plotting quickly if user requests it + continue; + } + } else { + // Returns all thread pools back to the manager + thread_pools.clear(); + } + + if pause_plotting_receiver.changed().await.is_err() { + break; + } + } + }; + + let process_actions_fut = async move { + while let Some(action) = action_receiver.next().await { + match action { + FarmerAction::PausePlotting(pause_plotting) => { + if let Err(error) = pause_plotting_sender.send(pause_plotting) { + debug!(%error, "Failed to forward pause plotting"); } } } - anyhow::Ok(()) + } + anyhow::Ok(()) + }; + + let farms_fut = async move { + while let Some(result) = farms_stream.next().await { + match result { + Ok(id) => { + info!(%id, "Farm exited successfully"); + } + Err(error) => { + return Err(error); + } + } + } + anyhow::Ok(()) + }; + + let farmer_fut = Box::pin( + async move { + select! { + _ = pause_plotting_actions_fut.fuse() => { + Ok(()) + } + _ = process_actions_fut.fuse() => { + Ok(()) + } + result = farms_fut.fuse() => { + result + } + } } .in_current_span(), ); anyhow::Ok(Farmer { - farm_fut, + farmer_fut, farmer_cache_worker_fut, initial_farm_states, farm_during_initial_plotting, notifications, + action_sender, }) } diff --git a/src/frontend/running.rs b/src/frontend/running.rs index c5fa023..ca8d872 100644 --- a/src/frontend/running.rs +++ b/src/frontend/running.rs @@ -13,6 +13,12 @@ use relm4::prelude::*; use relm4_icons::icon_name; use subspace_core_primitives::BlockNumber; use subspace_runtime_primitives::{Balance, SSC}; +use tracing::debug; + +#[derive(Debug)] +pub struct RunningInit { + pub plotting_paused: bool, +} #[derive(Debug)] pub enum RunningInput { @@ -27,6 +33,12 @@ pub enum RunningInput { NodeNotification(NodeNotification), FarmerNotification(FarmerNotification), ToggleFarmDetails, + TogglePausePlotting, +} + +#[derive(Debug)] +pub enum RunningOutput { + PausePlotting(bool), } #[derive(Debug, Default)] @@ -44,13 +56,14 @@ pub struct RunningView { node_synced: bool, farmer_state: FarmerState, farms: FactoryHashMap, + plotting_paused: bool, } #[relm4::component(pub)] impl Component for RunningView { - type Init = (); + type Init = RunningInit; type Input = RunningInput; - type Output = (); + type Output = RunningOutput; type CommandOutput = (); view! { @@ -82,6 +95,13 @@ impl Component for RunningView { set_icon_name: icon_name::GRID_FILLED, set_tooltip: "Expand details about each farm", }, + gtk::ToggleButton { + connect_clicked => RunningInput::TogglePausePlotting, + set_active: model.plotting_paused, + set_has_frame: false, + set_icon_name: icon_name::PAUSE, + set_tooltip: "Pause plotting/replotting, note that currently encoding sectors will not be interrupted", + }, gtk::Box { set_halign: gtk::Align::End, set_hexpand: true, @@ -163,7 +183,7 @@ impl Component for RunningView { } fn init( - _init: Self::Init, + init: Self::Init, _root: Self::Root, _sender: ComponentSender, ) -> ComponentParts { @@ -177,6 +197,7 @@ impl Component for RunningView { node_synced: false, farmer_state: FarmerState::default(), farms, + plotting_paused: init.plotting_paused, }; let farms_box = model.farms.widget(); @@ -185,13 +206,13 @@ impl Component for RunningView { ComponentParts { model, widgets } } - fn update(&mut self, input: Self::Input, _sender: ComponentSender, _root: &Self::Root) { - self.process_input(input); + fn update(&mut self, input: Self::Input, sender: ComponentSender, _root: &Self::Root) { + self.process_input(input, sender); } } impl RunningView { - fn process_input(&mut self, input: RunningInput) { + fn process_input(&mut self, input: RunningInput, sender: ComponentSender) { match input { RunningInput::Initialize { best_block_number, @@ -217,6 +238,7 @@ impl RunningView { total_sectors: initial_farm_state.total_sectors_count, plotted_total_sectors: initial_farm_state.plotted_sectors_count, farm_during_initial_plotting, + plotting_paused: self.plotting_paused, }, ); } @@ -309,6 +331,17 @@ impl RunningView { RunningInput::ToggleFarmDetails => { self.farms.broadcast(FarmWidgetInput::ToggleFarmDetails); } + RunningInput::TogglePausePlotting => { + self.plotting_paused = !self.plotting_paused; + self.farms + .broadcast(FarmWidgetInput::PausePlotting(self.plotting_paused)); + if sender + .output(RunningOutput::PausePlotting(self.plotting_paused)) + .is_err() + { + debug!("Failed to send RunningOutput::TogglePausePlotting"); + } + } } } } diff --git a/src/frontend/running/farm.rs b/src/frontend/running/farm.rs index a1ca92f..0ebc9b4 100644 --- a/src/frontend/running/farm.rs +++ b/src/frontend/running/farm.rs @@ -76,6 +76,7 @@ pub(super) struct FarmWidgetInit { pub(super) total_sectors: SectorIndex, pub(super) plotted_total_sectors: SectorIndex, pub(super) farm_during_initial_plotting: bool, + pub(super) plotting_paused: bool, } #[derive(Debug, Clone)] @@ -85,6 +86,7 @@ pub(super) enum FarmWidgetInput { update: SectorUpdate, }, FarmingNotification(FarmingNotification), + PausePlotting(bool), OpenFarmFolder, NodeSynced(bool), ToggleFarmDetails, @@ -105,6 +107,8 @@ pub(super) struct FarmWidget { sectors: HashMap, non_fatal_farming_error: Option>, farm_details: bool, + encoding_sectors: usize, + plotting_paused: bool, } #[relm4::factory(pub(super))] @@ -239,31 +243,51 @@ impl FactoryComponent for FarmWidget { match kind { PlottingKind::Initial => { - if self.farm_during_initial_plotting { - let farming = if self.is_node_synced { - "farming" + let initial_plotting = if self.plotting_paused { + if self.encoding_sectors > 0 { + "Pausing initial plotting" } else { - "not farming" - }; - format!( - "Initial plotting {:.2}%{}, {}", - progress, - plotting_speed, - farming - ) + "Paused initial plotting" + } } else { - format!( - "Initial plotting {:.2}%{}, not farming", - progress, - plotting_speed, - ) - } + "Initial plotting" + }; + let farming = if self.is_node_synced && self.farm_during_initial_plotting { + "farming" + } else { + "not farming" + }; + format!( + "{} {:.2}%{}, {}", + initial_plotting, + progress, + plotting_speed, + farming, + ) + }, + PlottingKind::Replotting => { + let replotting = if self.plotting_paused { + if self.encoding_sectors > 0 { + "Pausing replotting" + } else { + "Paused replotting" + } + } else { + "Replotting" + }; + let farming = if self.is_node_synced { + "farming" + } else { + "not farming" + }; + format!( + "{} {:.2}%{}, {}", + replotting, + progress, + plotting_speed, + farming, + ) }, - PlottingKind::Replotting => format!( - "Replotting {:.2}%{}, farming", - progress, - plotting_speed, - ), } }, }, @@ -336,6 +360,8 @@ impl FactoryComponent for FarmWidget { sectors: HashMap::from_iter((SectorIndex::MIN..).zip(sectors)), non_fatal_farming_error: None, farm_details: false, + encoding_sectors: 0, + plotting_paused: init.plotting_paused, } } @@ -377,9 +403,11 @@ impl FarmWidget { self.remove_sector_state(sector_index, SectorState::Downloading); } SectorPlottingDetails::Encoding => { + self.encoding_sectors += 1; self.update_sector_state(sector_index, SectorState::Encoding); } SectorPlottingDetails::Encoded(_) => { + self.encoding_sectors -= 1; self.remove_sector_state(sector_index, SectorState::Encoding); } SectorPlottingDetails::Writing => { @@ -423,6 +451,9 @@ impl FarmWidget { self.non_fatal_farming_error.replace(error); } }, + FarmWidgetInput::PausePlotting(plotting_paused) => { + self.plotting_paused = plotting_paused; + } FarmWidgetInput::OpenFarmFolder => { if let Err(error) = open::that_detached(&self.path) { error!(%error, path = %self.path.display(), "Failed to open farm folder"); diff --git a/src/main.rs b/src/main.rs index c776591..e2d0181 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,11 +5,12 @@ mod backend; mod frontend; use crate::backend::config::RawConfig; +use crate::backend::farmer::FarmerAction; use crate::backend::{wipe, BackendAction, BackendNotification}; use crate::frontend::configuration::{ConfigurationInput, ConfigurationOutput, ConfigurationView}; use crate::frontend::loading::{LoadingInput, LoadingView}; use crate::frontend::new_version::NewVersion; -use crate::frontend::running::{RunningInput, RunningView}; +use crate::frontend::running::{RunningInit, RunningInput, RunningOutput, RunningView}; use clap::Parser; use duct::cmd; use file_rotate::compression::Compression; @@ -82,6 +83,7 @@ type PosTable = ChiaTable; enum AppInput { BackendNotification(BackendNotification), Configuration(ConfigurationOutput), + Running(RunningOutput), OpenLogFolder, OpenReconfiguration, ShowAboutDialog, @@ -432,7 +434,12 @@ impl AsyncComponent for App { .launch(root.clone()) .forward(sender.input_sender(), AppInput::Configuration); - let running_view = RunningView::builder().launch(()).detach(); + let running_view = RunningView::builder() + .launch(RunningInit { + // Not paused on start + plotting_paused: false, + }) + .forward(sender.input_sender(), AppInput::Running); let about_dialog = gtk::AboutDialog::builder() .title("About") @@ -534,6 +541,9 @@ impl AsyncComponent for App { self.process_configuration_output(configuration_output) .await; } + AppInput::Running(running_output) => { + self.process_running_output(running_output).await; + } AppInput::OpenReconfiguration => { self.menu_popover.hide(); if let Some(raw_config) = self.current_raw_config.clone() { @@ -697,6 +707,24 @@ impl App { } } + async fn process_running_output(&mut self, running_output: RunningOutput) { + match running_output { + RunningOutput::PausePlotting(pause_plotting) => { + if let Err(error) = self + .backend_action_sender + .send(BackendAction::Farmer(FarmerAction::PausePlotting( + pause_plotting, + ))) + .await + { + self.current_view = View::Error(anyhow::anyhow!( + "Failed to send pause plotting to backend: {error}" + )); + } + } + } + } + fn process_command(&mut self, input: AppCommandOutput) { match input { AppCommandOutput::BackendNotification(notification) => {