Skip to content

Commit

Permalink
Cleanup & proper exit once all streams close
Browse files Browse the repository at this point in the history
  • Loading branch information
tarkah committed Sep 24, 2024
1 parent 7879273 commit 8abfb9f
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 117 deletions.
18 changes: 14 additions & 4 deletions data/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,20 @@ impl Map {
}
}

pub fn exit(&mut self) -> HashSet<Server> {
self.0
.iter_mut()
.filter_map(|(server, state)| {
if let State::Ready(client) = state {
client.quit(None);
Some(server.clone())
} else {
None
}
})
.collect()
}

pub fn resolve_user_attributes<'a>(
&'a self,
server: &Server,
Expand Down Expand Up @@ -1560,10 +1574,6 @@ impl Map {
}
})
}

pub fn take(&mut self) -> Self {
Self(std::mem::take(&mut self.0))
}
}

#[derive(Debug, Clone)]
Expand Down
5 changes: 3 additions & 2 deletions data/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use irc::proto;
use tokio::fs;
use tokio::time::Instant;

pub use self::manager::{Manager, Resource};
pub use self::metadata::{Metadata, ReadMarker};
use crate::user::Nick;
use crate::{compression, environment, message, server, Message};

pub use self::manager::{Manager, Resource};
pub use self::metadata::{Metadata, ReadMarker};

pub mod manager;
pub mod metadata;

Expand Down
82 changes: 42 additions & 40 deletions data/src/history/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,18 @@ pub enum Message {
Result<Option<history::ReadMarker>, history::Error>,
),
Flushed(server::Server, history::Kind, Result<(), history::Error>),
Exited(
Vec<(
Server,
history::Kind,
Result<Option<history::ReadMarker>, history::Error>,
)>,
),
}

pub enum Event {
Closed(server::Server, history::Kind, Option<history::ReadMarker>),
Exited(Vec<(Server, history::Kind, Option<history::ReadMarker>)>),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -109,12 +117,12 @@ impl Manager {
Message::Flushed(server, kind, Err(error)) => {
log::warn!("failed to flush history for {kind} on {server}: {error}")
}
Message::UpdatePartial(server, kind, Ok(loaded)) => {
log::debug!("updating metadata for {kind} on {server}");
self.data.update_partial(server, kind, loaded);
Message::UpdatePartial(server, kind, Ok(metadata)) => {
log::debug!("loaded metadata for {kind} on {server}");
self.data.update_partial(server, kind, metadata);
}
Message::UpdatePartial(server, kind, Err(error)) => {
log::warn!("failed to load history metadata for {kind} on {server}: {error}");
log::warn!("failed to load metadata for {kind} on {server}: {error}");
}
Message::UpdateReadMarker(server, kind, read_marker, Ok(_)) => {
log::debug!("updated read marker for {kind} on {server} to {read_marker}");
Expand All @@ -124,6 +132,24 @@ impl Manager {
"failed to update read marker for {kind} on {server} to {read_marker}: {error}"
);
}
Message::Exited(results) => {
let mut output = vec![];

for (server, kind, result) in results {
match result {
Ok(marker) => {
log::debug!("closed history for {kind} on {server}",);
output.push((server, kind, marker));
}
Err(error) => {
log::warn!("failed to close history for {kind} on {server}: {error}");
output.push((server, kind, None));
}
}
}

return Some(Event::Exited(output));
}
}

None
Expand All @@ -137,26 +163,17 @@ impl Manager {
&mut self,
server: Server,
kind: history::Kind,
) -> Option<impl Future<Output = (Server, history::Kind, Option<history::ReadMarker>)>> {
) -> Option<impl Future<Output = Message>> {
let history = self.data.map.get_mut(&server)?.remove(&kind)?;

Some(async move {
match history.close().await {
Ok(marker) => {
log::debug!("closed history for {kind} on {server}",);
(server, kind, marker)
}
Err(error) => {
log::warn!("failed to close history for {kind} on {server}: {error}");
(server, kind, None)
}
}
})
Some(
history
.close()
.map(|result| Message::Closed(server, kind, result)),
)
}

pub fn close_all(
&mut self,
) -> impl Future<Output = Vec<(Server, history::Kind, Option<history::ReadMarker>)>> {
pub fn exit(&mut self) -> impl Future<Output = Message> {
let map = std::mem::take(&mut self.data).map;

async move {
Expand All @@ -167,24 +184,7 @@ impl Manager {
})
});

let results = future::join_all(tasks).await;

let mut output = vec![];

for (server, kind, result) in results {
match result {
Ok(marker) => {
log::debug!("closed history for {kind} on {server}",);
output.push((server, kind, marker));
}
Err(error) => {
log::warn!("failed to close history for {kind} on {server}: {error}");
output.push((server, kind, None));
}
}
}

output
Message::Exited(future::join_all(tasks).await)
}
}

Expand Down Expand Up @@ -228,7 +228,7 @@ impl Manager {
pub fn update_read_marker(
&mut self,
server: Server,
kind: history::Kind,
kind: impl Into<history::Kind>,
read_marker: history::ReadMarker,
) -> Option<impl Future<Output = Message>> {
self.data.update_read_marker(server, kind, read_marker)
Expand Down Expand Up @@ -717,11 +717,13 @@ impl Data {
fn update_read_marker(
&mut self,
server: server::Server,
kind: history::Kind,
kind: impl Into<history::Kind>,
read_marker: history::ReadMarker,
) -> Option<impl Future<Output = Message>> {
use std::collections::hash_map;

let kind = kind.into();

match self
.map
.entry(server.clone())
Expand Down
2 changes: 1 addition & 1 deletion data/src/history/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use chrono::{format::SecondsFormat, DateTime, Utc};
use std::fmt;
use std::path::PathBuf;
use std::str::FromStr;

use chrono::{format::SecondsFormat, DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::fs;

Expand Down
81 changes: 50 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ mod url;
mod widget;
mod window;

use std::collections::HashSet;
use std::env;
use std::time::{Duration, Instant};

use chrono::Utc;
use data::config::{self, Config};
use data::history;
use data::history::manager::Broadcast;
use data::version::Version;
use data::{environment, server, version, Url, User};
use data::{history, Server};
use iced::widget::{column, container};
use iced::{padding, Length, Subscription, Task};
use screen::{dashboard, help, migration, welcome};
Expand Down Expand Up @@ -201,6 +202,7 @@ pub enum Screen {
Help(screen::Help),
Welcome(screen::Welcome),
Migration(screen::Migration),
Exit { pending_exit: HashSet<Server> },
}

#[derive(Debug)]
Expand Down Expand Up @@ -339,6 +341,16 @@ impl Halloy {
self.clients.quit(&server, None);
Task::none()
}
Some(dashboard::Event::Exit) => {
let pending_exit = self.clients.exit();

if pending_exit.is_empty() {
iced::exit()
} else {
self.screen = Screen::Exit { pending_exit };
Task::none()
}
}
None => Task::none(),
};

Expand Down Expand Up @@ -695,13 +707,11 @@ impl Halloy {
}
}
data::client::Event::UpdateReadMarker(target, read_marker) => {
let kind = history::Kind::from(target);

commands.push(
dashboard
.update_read_marker(
server.clone(),
kind,
target,
read_marker,
)
.map(Message::Dashboard),
Expand All @@ -727,34 +737,42 @@ impl Halloy {

Task::batch(commands)
}
stream::Update::Quit(server, reason) => {
let Screen::Dashboard(dashboard) = &mut self.screen else {
return Task::none();
};

self.servers.remove(&server);

if let Some(client) = self.clients.remove(&server) {
let user = client.nickname().to_owned().into();

let channels = client.channels().to_vec();
stream::Update::Quit(server, reason) => match &mut self.screen {
Screen::Dashboard(dashboard) => {
self.servers.remove(&server);

if let Some(client) = self.clients.remove(&server) {
let user = client.nickname().to_owned().into();

let channels = client.channels().to_vec();

dashboard
.broadcast(
&server,
&self.config,
Utc::now(),
Broadcast::Quit {
user,
comment: reason,
user_channels: channels,
},
)
.map(Message::Dashboard)
} else {
Task::none()
}
}
Screen::Exit { pending_exit } => {
pending_exit.remove(&server);

dashboard
.broadcast(
&server,
&self.config,
Utc::now(),
Broadcast::Quit {
user,
comment: reason,
user_channels: channels,
},
)
.map(Message::Dashboard)
} else {
Task::none()
if pending_exit.is_empty() {
iced::exit()
} else {
Task::none()
}
}
}
_ => Task::none(),
},
},
Message::Event(window, event) => {
// Events only enabled for main window
Expand Down Expand Up @@ -838,7 +856,7 @@ impl Halloy {
}
window::Event::CloseRequested => {
if let Screen::Dashboard(dashboard) = &mut self.screen {
return dashboard.exit(self.clients.take()).then(|_| iced::exit());
return dashboard.exit().map(Message::Dashboard);
} else {
return iced::exit();
}
Expand Down Expand Up @@ -882,6 +900,7 @@ impl Halloy {
Screen::Help(help) => help.view().map(Message::Help),
Screen::Welcome(welcome) => welcome.view().map(Message::Welcome),
Screen::Migration(migration) => migration.view().map(Message::Migration),
Screen::Exit { .. } => column![].into(),
};

let content = container(screen)
Expand Down
Loading

0 comments on commit 8abfb9f

Please sign in to comment.