Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chathistory Support #619

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
798 changes: 765 additions & 33 deletions data/src/client.rs

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions data/src/config/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct Buffer {
pub internal_messages: InternalMessages,
#[serde(default)]
pub status_message_prefix: StatusMessagePrefix,
#[serde(default)]
pub chathistory: ChatHistory,
}

#[derive(Debug, Clone, Default, Deserialize)]
Expand Down Expand Up @@ -139,6 +141,12 @@ impl Default for InternalMessage {
}
}

#[derive(Debug, Clone, Default, Deserialize)]
pub struct ChatHistory {
#[serde(default)]
pub infinite_scroll: bool,
}

#[derive(Debug, Copy, Clone, Default, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum UsernameFormat {
Expand Down
7 changes: 7 additions & 0 deletions data/src/config/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub struct Server {
/// A list of nicknames to monitor (if MONITOR is supported by the server).
#[serde(default)]
pub monitor: Vec<String>,
#[serde(default = "default_chathistory")]
pub chathistory: bool,
}

impl Server {
Expand Down Expand Up @@ -175,6 +177,7 @@ impl Default for Server {
who_poll_interval: default_who_poll_interval(),
who_retry_interval: default_who_retry_interval(),
monitor: Default::default(),
chathistory: default_chathistory(),
}
}
}
Expand Down Expand Up @@ -297,3 +300,7 @@ fn default_who_poll_interval() -> Duration {
fn default_who_retry_interval() -> Duration {
Duration::from_secs(10)
}

fn default_chathistory() -> bool {
true
}
145 changes: 142 additions & 3 deletions data/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use irc::proto;
use tokio::fs;
use tokio::time::Instant;

use crate::message::{self, MessageReferences};
use crate::user::Nick;
use crate::{buffer, compression, environment, message, Buffer, Message, Server};
use crate::{buffer, compression, environment, Buffer, Message, Server};

pub use self::manager::{Manager, Resource};
pub use self::metadata::{Metadata, ReadMarker};
Expand Down Expand Up @@ -157,7 +158,9 @@ pub async fn append(
let loaded = load(kind.clone()).await?;

let mut all_messages = loaded.messages;
all_messages.extend(messages);
messages.into_iter().for_each(|message| {
insert_message(&mut all_messages, message);
});

overwrite(kind, &all_messages, read_marker).await
}
Expand Down Expand Up @@ -203,6 +206,7 @@ pub enum History {
last_updated_at: Option<Instant>,
max_triggers_unread: Option<DateTime<Utc>>,
read_marker: Option<ReadMarker>,
chathistory_references: Option<MessageReferences>,
},
Full {
kind: Kind,
Expand All @@ -220,18 +224,23 @@ impl History {
last_updated_at: None,
max_triggers_unread: None,
read_marker: None,
chathistory_references: None,
}
}

pub fn update_partial(&mut self, metadata: Metadata) {
if let Self::Partial {
max_triggers_unread,
read_marker,
chathistory_references,
..
} = self
{
*read_marker = (*read_marker).max(metadata.read_marker);
*max_triggers_unread = (*max_triggers_unread).max(metadata.last_triggers_unread);
*chathistory_references = chathistory_references
.clone()
.max(metadata.chathistory_references);
}
}

Expand Down Expand Up @@ -279,7 +288,7 @@ impl History {
} => {
*last_updated_at = Some(Instant::now());

messages.push(message);
insert_message(messages, message);
}
}
}
Expand Down Expand Up @@ -357,13 +366,15 @@ impl History {

let read_marker = ReadMarker::latest(&messages).max(*read_marker);
let max_triggers_unread = metadata::latest_triggers_unread(&messages);
let chathistory_references = metadata::latest_can_reference(&messages);

*self = Self::Partial {
kind: kind.clone(),
messages: vec![],
last_updated_at: None,
read_marker,
max_triggers_unread,
chathistory_references,
};

Some(async move {
Expand Down Expand Up @@ -402,6 +413,48 @@ impl History {
}
}

pub fn first_can_reference(&self) -> Option<&Message> {
match self {
History::Partial { messages, .. } | History::Full { messages, .. } => {
messages.iter().find(|message| message.can_reference())
}
}
}

pub fn last_can_reference_before(
&self,
server_time: DateTime<Utc>,
) -> Option<MessageReferences> {
match self {
History::Partial {
messages,
chathistory_references,
..
} => messages
.iter()
.rev()
.find(|message| message.can_reference() && message.server_time < server_time)
.map_or(
if chathistory_references
.as_ref()
.is_some_and(|chathistory_references| {
chathistory_references.timestamp < server_time
})
{
chathistory_references.clone()
} else {
None
},
|message| Some(message.references()),
),
History::Full { messages, .. } => messages
.iter()
.rev()
.find(|message| message.can_reference() && message.server_time < server_time)
.map(|message| message.references()),
}
}

pub fn update_read_marker(&mut self, read_marker: ReadMarker) {
let stored = match self {
History::Partial { read_marker, .. } => read_marker,
Expand All @@ -420,6 +473,92 @@ impl History {
}
}

/// Insert the incoming message into the provided vector, sorted
/// on server time
///
/// Deduplication is only checked +/- 1 second around the server time
/// of the incoming message. Either message IDs match, or server times
/// have an exact match + target & content.
pub fn insert_message(messages: &mut Vec<Message>, message: Message) {
Comment on lines +476 to +482
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we have a commit which used a binary search implementation for inserting messages?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the insert_message from that work we did together, completely unchanged. I was lazy about persisting the commit history while refactoring, so it's all been squashed into one commit.

The one thing this doesn't keep related to that work is the sorting of saved histories on load. In that version of the chathistory PR, I had changed the history storage scheme to no longer be obfuscated (no longer use hashes in naming the files). If histories were not found with the non-hashed storage scheme (where histories can be expected to be sorted already), then it would attempt to load & sort a history from the old, hashed storage scheme. I still like that idea, but it seemed like this PR might have too many changes already, so I omitted it this time around.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh my bad, I just totally forgot what the code looked like and didn't realize that was the implementation we landed on :P

const FUZZ_SECONDS: chrono::Duration = chrono::Duration::seconds(1);

if messages.is_empty() {
messages.push(message);

return;
}

let start = message.server_time - FUZZ_SECONDS;
let end = message.server_time + FUZZ_SECONDS;

let start_index = match messages.binary_search_by(|stored| stored.server_time.cmp(&start)) {
Ok(match_index) => match_index,
Err(sorted_insert_index) => sorted_insert_index,
};
let end_index = match messages.binary_search_by(|stored| stored.server_time.cmp(&end)) {
Ok(match_index) => match_index,
Err(sorted_insert_index) => sorted_insert_index,
};

let mut current_index = start_index;
let mut insert_at = start_index;
let mut replace_at = None;

for stored in &messages[start_index..end_index] {
if (message.id.is_some() && stored.id == message.id)
|| ((stored.server_time == message.server_time
|| (matches!(stored.direction, message::Direction::Sent)
&& matches!(message.direction, message::Direction::Received)))
&& has_matching_content(stored, &message))
{
replace_at = Some(current_index);
break;
}

if message.server_time >= stored.server_time {
insert_at = current_index + 1;
}

current_index += 1;
}

if let Some(index) = replace_at {
if has_matching_content(&messages[index], &message) {
messages[index].id = message.id;
messages[index].received_at = message.received_at;
} else {
messages[index] = message;
}
} else {
messages.insert(insert_at, message);
}
}

/// The content of JOIN, PART, and QUIT messages may be dependent on how
/// the user attributes are resolved. Match those messages based on Nick
/// alone (covered by comparing target components) to avoid false negatives.
fn has_matching_content(message: &Message, other: &Message) -> bool {
if message.target == other.target {
if let message::Source::Server(Some(source)) = message.target.source() {
match source.kind() {
message::source::server::Kind::Join
| message::source::server::Kind::Part
| message::source::server::Kind::Quit => {
return true;
}
message::source::server::Kind::ReplyTopic
| message::source::server::Kind::ChangeHost
| message::source::server::Kind::MonitoredOnline
| message::source::server::Kind::MonitoredOffline => (),
}
}

message.content == other.content
} else {
false
}
}

#[derive(Debug)]
pub struct View<'a> {
pub total: usize,
Expand Down
61 changes: 56 additions & 5 deletions data/src/history/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::future::BoxFuture;
use futures::{future, Future, FutureExt};
use tokio::time::Instant;

use crate::history::{self, History};
use crate::history::{self, History, MessageReferences};
use crate::message::{self, Limit};
use crate::user::Nick;
use crate::{buffer, config, input};
Expand Down Expand Up @@ -238,12 +238,32 @@ impl Manager {
self.data.update_read_marker(kind, read_marker)
}

pub fn channel_joined(
pub fn load_metadata(
&mut self,
server: Server,
channel: String,
) -> Option<impl Future<Output = Message>> {
self.data.channel_joined(server, channel)
self.data.load_metadata(server, channel)
}

pub fn first_can_reference(
&self,
server: Server,
chantypes: &[char],
target: String,
) -> Option<&crate::Message> {
self.data.first_can_reference(server, chantypes, target)
}

pub fn last_can_reference_before(
&self,
server: Server,
chantypes: &[char],
target: String,
server_time: DateTime<Utc>,
) -> Option<MessageReferences> {
self.data
.last_can_reference_before(server, chantypes, target, server_time)
}

pub fn get_messages(
Expand Down Expand Up @@ -466,7 +486,11 @@ impl Data {
let read_marker = (*partial_read_marker).max(metadata.read_marker);

let last_updated_at = *last_updated_at;
messages.extend(std::mem::take(new_messages));
std::mem::take(new_messages)
.into_iter()
.for_each(|message| {
history::insert_message(&mut messages, message);
});
entry.insert(History::Full {
kind,
messages,
Expand Down Expand Up @@ -717,7 +741,7 @@ impl Data {
}
}

fn channel_joined(
fn load_metadata(
&mut self,
server: server::Server,
channel: String,
Expand All @@ -743,6 +767,33 @@ impl Data {
}
}

fn first_can_reference(
&self,
server: server::Server,
chantypes: &[char],
target: String,
) -> Option<&crate::Message> {
let kind = history::Kind::from_target(server, target, chantypes);

self.map
.get(&kind)
.and_then(|history| history.first_can_reference())
}

fn last_can_reference_before(
&self,
server: Server,
chantypes: &[char],
target: String,
server_time: DateTime<Utc>,
) -> Option<MessageReferences> {
let kind = history::Kind::from_target(server, target, chantypes);

self.map
.get(&kind)
.and_then(|history| history.last_can_reference_before(server_time))
}

fn untrack(
&mut self,
kind: &history::Kind,
Expand Down
Loading
Loading