From 8537da9688e5b342eb1e26329ea6739599582171 Mon Sep 17 00:00:00 2001 From: elliedavidson <118024407+elliedavidson@users.noreply.github.com> Date: Mon, 14 Aug 2023 15:58:58 -0400 Subject: [PATCH] cleanup --- task-impls/src/view_sync.rs | 41 +++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/task-impls/src/view_sync.rs b/task-impls/src/view_sync.rs index 00822212e9..ee0426f4ce 100644 --- a/task-impls/src/view_sync.rs +++ b/task-impls/src/view_sync.rs @@ -91,10 +91,10 @@ pub struct ViewSyncTaskState< /// How many timeouts we've seen in a row; is reset upon a successful view change pub num_timeouts_tracked: u64, - /// Represents if replica task is running, + /// Map of running replica tasks pub replica_task_map: HashMap, - /// Represents if relay task is running + /// Map of running relay tasks pub relay_task_map: HashMap, pub view_sync_timeout: Duration, @@ -411,28 +411,47 @@ where self.relay_task_map .insert(vote_internal.round, ViewSyncTaskInfo { event_stream_id }); - // TODO ED For now we will not await these futures, in the future we can await them only in the case of shutdown let _view_sync_relay_task = async_spawn(async move { ViewSyncRelayTaskStateTypes::build(builder).launch().await }); } &SequencingHotShotEvent::ViewChange(new_view) => { + let new_view = TYPES::Time::new(*new_view); // TODO ED Don't call new twice - if self.current_view < TYPES::Time::new(*new_view) { + if self.current_view < new_view { debug!( "Change from view {} to view {} in view sync task", *self.current_view, *new_view ); - self.current_view = TYPES::Time::new(*new_view); + self.current_view = new_view; self.next_view = self.current_view; self.num_timeouts_tracked = 0; - // GC old tasks: non inclusive of end, so is fine because we won't GC the latest view + // Garbage collect old tasks + // We could put this into a separate async task, but that would require making several fields on ViewSyncTaskState thread-safe and harm readability. In the common case this will have zero tasks to clean up. for i in *self.last_garbage_collected_view..*self.current_view { - self.replica_task_map.remove_entry(&TYPES::Time::new(i)); - self.relay_task_map.remove_entry(&TYPES::Time::new(i)); + if let Some((_key, replica_task_info)) = + self.replica_task_map.remove_entry(&TYPES::Time::new(i)) + { + self.event_stream + .direct_message( + replica_task_info.event_stream_id, + SequencingHotShotEvent::Shutdown, + ) + .await; + } + if let Some((_key, relay_task_info)) = + self.relay_task_map.remove_entry(&TYPES::Time::new(i)) + { + self.event_stream + .direct_message( + relay_task_info.event_stream_id, + SequencingHotShotEvent::Shutdown, + ) + .await; + } } self.last_garbage_collected_view = self.current_view - 1; @@ -507,8 +526,7 @@ where }, )); - // TODO ED Change from default filter - let filter = FilterEvent::default(); + let filter = FilterEvent(Arc::new(Self::filter)); let builder = TaskBuilder::>::new(name) .register_event_stream(replica_state.event_stream.clone(), filter) @@ -525,7 +543,6 @@ where ViewSyncTaskInfo { event_stream_id }, ); - // TODO ED For now we will not await these futures, in the future we can await them only in the case of shutdown let _view_sync_replica_task = async_spawn(async move { ViewSyncReplicaTaskStateTypes::build(builder).launch().await }); @@ -602,8 +619,6 @@ where } }; - // warn!("received cert in handle_event for replica"); - // Ignore certificate if it is for an older round if certificate_internal.round < self.next_view { debug!("We're already in a higher round");