Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
elliedavidson committed Aug 14, 2023
1 parent 6150883 commit 8537da9
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ pub struct ViewSyncTaskState<
/// How many timeouts we've seen in a row; is reset upon a successful view change
pub num_timeouts_tracked: u64,

/// Represents if replica task is running,
/// Map of running replica tasks
pub replica_task_map: HashMap<TYPES::Time, ViewSyncTaskInfo>,

/// Represents if relay task is running
/// Map of running relay tasks
pub relay_task_map: HashMap<TYPES::Time, ViewSyncTaskInfo>,

pub view_sync_timeout: Duration,
Expand Down Expand Up @@ -411,28 +411,47 @@ where

self.relay_task_map
.insert(vote_internal.round, ViewSyncTaskInfo { event_stream_id });
// TODO ED For now we will not await these futures, in the future we can await them only in the case of shutdown
let _view_sync_relay_task = async_spawn(async move {
ViewSyncRelayTaskStateTypes::build(builder).launch().await
});
}

&SequencingHotShotEvent::ViewChange(new_view) => {
let new_view = TYPES::Time::new(*new_view);
// TODO ED Don't call new twice
if self.current_view < TYPES::Time::new(*new_view) {
if self.current_view < new_view {
debug!(
"Change from view {} to view {} in view sync task",
*self.current_view, *new_view
);

self.current_view = TYPES::Time::new(*new_view);
self.current_view = new_view;
self.next_view = self.current_view;
self.num_timeouts_tracked = 0;

// GC old tasks: non inclusive of end, so is fine because we won't GC the latest view
// Garbage collect old tasks
// We could put this into a separate async task, but that would require making several fields on ViewSyncTaskState thread-safe and harm readability. In the common case this will have zero tasks to clean up.
for i in *self.last_garbage_collected_view..*self.current_view {
self.replica_task_map.remove_entry(&TYPES::Time::new(i));
self.relay_task_map.remove_entry(&TYPES::Time::new(i));
if let Some((_key, replica_task_info)) =
self.replica_task_map.remove_entry(&TYPES::Time::new(i))
{
self.event_stream
.direct_message(
replica_task_info.event_stream_id,
SequencingHotShotEvent::Shutdown,
)
.await;
}
if let Some((_key, relay_task_info)) =
self.relay_task_map.remove_entry(&TYPES::Time::new(i))
{
self.event_stream
.direct_message(
relay_task_info.event_stream_id,
SequencingHotShotEvent::Shutdown,
)
.await;
}
}

self.last_garbage_collected_view = self.current_view - 1;
Expand Down Expand Up @@ -507,8 +526,7 @@ where
},
));

// TODO ED Change from default filter
let filter = FilterEvent::default();
let filter = FilterEvent(Arc::new(Self::filter));
let builder =
TaskBuilder::<ViewSyncReplicaTaskStateTypes<TYPES, I, A>>::new(name)
.register_event_stream(replica_state.event_stream.clone(), filter)
Expand All @@ -525,7 +543,6 @@ where
ViewSyncTaskInfo { event_stream_id },
);

// TODO ED For now we will not await these futures, in the future we can await them only in the case of shutdown
let _view_sync_replica_task = async_spawn(async move {
ViewSyncReplicaTaskStateTypes::build(builder).launch().await
});
Expand Down Expand Up @@ -602,8 +619,6 @@ where
}
};

// warn!("received cert in handle_event for replica");

// Ignore certificate if it is for an older round
if certificate_internal.round < self.next_view {
debug!("We're already in a higher round");
Expand Down

0 comments on commit 8537da9

Please sign in to comment.