Skip to content

Commit

Permalink
Persist last triggers so we only need to load metadata for partial
Browse files Browse the repository at this point in the history
  • Loading branch information
tarkah committed Sep 23, 2024
1 parent b06d606 commit 07ba40f
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 86 deletions.
84 changes: 39 additions & 45 deletions data/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ pub async fn overwrite(
server: &server::Server,
kind: &Kind,
messages: &[Message],
metadata: &Metadata,
read_marker: Option<ReadMarker>,
) -> Result<(), Error> {
if messages.is_empty() {
return metadata::save(server, kind, metadata).await;
return metadata::save(server, kind, messages, read_marker).await;
}

let latest = &messages[messages.len().saturating_sub(MAX_MESSAGES)..];
Expand All @@ -110,7 +110,7 @@ pub async fn overwrite(

fs::write(path, &compressed).await?;

metadata::save(server, kind, metadata).await?;
metadata::save(server, kind, latest, read_marker).await?;

Ok(())
}
Expand All @@ -119,18 +119,18 @@ pub async fn append(
server: &server::Server,
kind: &Kind,
messages: Vec<Message>,
metadata: &Metadata,
read_marker: Option<ReadMarker>,
) -> Result<(), Error> {
if messages.is_empty() {
return metadata::save(server, kind, metadata).await;
return metadata::save(server, kind, &messages, read_marker).await;
}

let loaded = load(server.clone(), kind.clone()).await?;

let mut all_messages = loaded.messages;
all_messages.extend(messages);

overwrite(server, kind, &all_messages, metadata).await
overwrite(server, kind, &all_messages, read_marker).await
}

async fn read_all(path: &PathBuf) -> Result<Vec<Message>, Error> {
Expand Down Expand Up @@ -164,29 +164,22 @@ async fn path(server: &server::Server, kind: &Kind) -> Result<PathBuf, Error> {
Ok(dir.join(format!("{hashed_name}.json.gz")))
}

fn last_triggers_unread(messages: &[Message]) -> Option<DateTime<Utc>> {
messages
.iter()
.rev()
.find_map(|message| message.triggers_unread().then_some(message.server_time))
}

#[derive(Debug)]
pub enum History {
Partial {
server: server::Server,
kind: Kind,
messages: Vec<Message>,
last_updated_at: Option<Instant>,
metadata: Metadata,
max_triggers_unread: Option<DateTime<Utc>>,
read_marker: Option<ReadMarker>,
},
Full {
server: server::Server,
kind: Kind,
messages: Vec<Message>,
last_updated_at: Option<Instant>,
metadata: Metadata,
read_marker: Option<ReadMarker>,
},
}

Expand All @@ -197,32 +190,32 @@ impl History {
kind,
messages: vec![],
last_updated_at: None,
metadata: Metadata::default(),
max_triggers_unread: None,
read_marker: None,
}
}

pub fn update_partial(&mut self, loaded: Loaded) {
pub fn update_partial(&mut self, metadata: Metadata) {
if let Self::Partial {
metadata,
max_triggers_unread,
read_marker,
..
} = self
{
*metadata = metadata.merge(loaded.metadata);
*max_triggers_unread = last_triggers_unread(&loaded.messages).max(*max_triggers_unread);
*read_marker = (*read_marker).max(metadata.read_marker);
*max_triggers_unread = (*max_triggers_unread).max(metadata.last_triggers_unread);
}
}

fn has_unread(&self) -> bool {
match self {
History::Partial {
metadata,
max_triggers_unread,
read_marker,
..
} => {
// Read marker is prior to last known message which triggers unread
if let Some(read_marker) = metadata.read_marker {
if let Some(read_marker) = read_marker {
max_triggers_unread.is_some_and(|max| read_marker.date_time() < max)
}
// Default state == unread if theres messages that trigger indicator
Expand Down Expand Up @@ -269,8 +262,8 @@ impl History {
server,
kind,
messages,
metadata,
last_updated_at,
read_marker,
..
} => {
if let Some(last_received) = *last_updated_at {
Expand All @@ -279,13 +272,13 @@ impl History {
if since >= FLUSH_AFTER_LAST_RECEIVED {
let server = server.clone();
let kind = kind.clone();
let metadata = *metadata;
let messages = std::mem::take(messages);
let read_marker = *read_marker;

*last_updated_at = None;

return Some(
async move { append(&server, &kind, messages, &metadata).await }
async move { append(&server, &kind, messages, read_marker).await }
.boxed(),
);
}
Expand All @@ -297,8 +290,8 @@ impl History {
server,
kind,
messages,
metadata,
last_updated_at,
read_marker,
..
} => {
if let Some(last_received) = *last_updated_at {
Expand All @@ -307,7 +300,7 @@ impl History {
if since >= FLUSH_AFTER_LAST_RECEIVED && !messages.is_empty() {
let server = server.clone();
let kind = kind.clone();
let metadata = *metadata;
let read_marker = *read_marker;
*last_updated_at = None;

if messages.len() > MAX_MESSAGES {
Expand All @@ -317,7 +310,7 @@ impl History {
let messages = messages.clone();

return Some(
async move { overwrite(&server, &kind, &messages, &metadata).await }
async move { overwrite(&server, &kind, &messages, read_marker).await }
.boxed(),
);
}
Expand All @@ -335,28 +328,29 @@ impl History {
server,
kind,
messages,
metadata,
read_marker,
..
} => {
let server = server.clone();
let kind = kind.clone();
let messages = std::mem::take(messages);

let metadata = metadata.updated(&messages);
let read_marker = ReadMarker::latest(&messages).max(*read_marker);
let max_triggers_unread = metadata::find_latest_triggers(&messages);

*self = Self::Partial {
server: server.clone(),
kind: kind.clone(),
messages: vec![],
last_updated_at: None,
metadata,
max_triggers_unread: last_triggers_unread(&messages),
read_marker,
max_triggers_unread,
};

Some(async move {
overwrite(&server, &kind, &messages, &metadata)
overwrite(&server, &kind, &messages, read_marker)
.await
.map(|_| metadata.read_marker)
.map(|_| read_marker)
})
}
}
Expand All @@ -368,36 +362,36 @@ impl History {
server,
kind,
messages,
metadata,
read_marker,
..
} => {
append(&server, &kind, messages, &metadata).await?;
append(&server, &kind, messages, read_marker).await?;

Ok(metadata.read_marker)
Ok(read_marker)
}
History::Full {
server,
kind,
messages,
metadata,
read_marker,
..
} => {
let metadata = metadata.updated(&messages);
let read_marker = ReadMarker::latest(&messages).max(read_marker);

overwrite(&server, &kind, &messages, &metadata).await?;
overwrite(&server, &kind, &messages, read_marker).await?;

Ok(metadata.read_marker)
Ok(read_marker)
}
}
}

pub fn update_read_marker(&mut self, read_marker: ReadMarker) {
let metadata = match self {
History::Partial { metadata, .. } => metadata,
History::Full { metadata, .. } => metadata,
let stored = match self {
History::Partial { read_marker, .. } => read_marker,
History::Full { read_marker, .. } => read_marker,
};

metadata.update_read_marker(read_marker);
*stored = (*stored).max(Some(read_marker));
}
}

Expand Down
28 changes: 15 additions & 13 deletions data/src/history/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub enum Message {
UpdatePartial(
server::Server,
history::Kind,
Result<history::Loaded, history::Error>,
Result<history::Metadata, history::Error>,
),
Closed(
server::Server,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Manager {
log::warn!("failed to flush history for {kind} on {server}: {error}")
}
Message::UpdatePartial(server, kind, Ok(loaded)) => {
log::debug!("updating partial history for {kind} on {server}");
log::debug!("updating metadata for {kind} on {server}");
self.data.update_partial(server, kind, loaded);
}
Message::UpdatePartial(server, kind, Err(error)) => {
Expand Down Expand Up @@ -472,10 +472,10 @@ impl Data {
History::Partial {
messages: new_messages,
last_updated_at,
metadata: partial_metadata,
read_marker: partial_read_marker,
..
} => {
let metadata = partial_metadata.merge(metadata);
let read_marker = (*partial_read_marker).max(metadata.read_marker);

let last_updated_at = *last_updated_at;
messages.extend(std::mem::take(new_messages));
Expand All @@ -484,7 +484,7 @@ impl Data {
kind,
messages,
last_updated_at,
metadata,
read_marker,
});
}
_ => {
Expand All @@ -493,7 +493,7 @@ impl Data {
kind,
messages,
last_updated_at: None,
metadata,
read_marker: metadata.read_marker,
});
}
},
Expand All @@ -503,7 +503,7 @@ impl Data {
kind,
messages,
last_updated_at: None,
metadata,
read_marker: metadata.read_marker,
});
}
}
Expand All @@ -513,7 +513,7 @@ impl Data {
&mut self,
server: server::Server,
kind: history::Kind,
data: history::Loaded,
data: history::Metadata,
) {
if let Some(history) = self.map.get_mut(&server).and_then(|map| map.get_mut(&kind)) {
history.update_partial(data);
Expand All @@ -528,7 +528,9 @@ impl Data {
buffer_config: &config::Buffer,
) -> Option<history::View> {
let History::Full {
messages, metadata, ..
messages,
read_marker,
..
} = self.map.get(server)?.get(kind)?
else {
return None;
Expand Down Expand Up @@ -643,7 +645,7 @@ impl Data {

let limited = with_limit(limit, filtered.into_iter());

let split_at = metadata.read_marker.map_or(0, |read_marker| {
let split_at = read_marker.map_or(0, |read_marker| {
limited
.iter()
.rev()
Expand Down Expand Up @@ -688,7 +690,7 @@ impl Data {

Some(
async move {
let loaded = history::load(server.clone(), kind.clone()).await;
let loaded = history::metadata::load(server.clone(), kind.clone()).await;

Message::UpdatePartial(server, kind, loaded)
}
Expand Down Expand Up @@ -724,7 +726,7 @@ impl Data {

Some(
async move {
let loaded = history::load(server.clone(), kind.clone()).await;
let loaded = history::metadata::load(server.clone(), kind.clone()).await;

Message::UpdatePartial(server, kind, loaded)
}
Expand Down Expand Up @@ -755,7 +757,7 @@ impl Data {

Some(
async move {
let loaded = history::load(server.clone(), kind.clone()).await;
let loaded = history::metadata::load(server.clone(), kind.clone()).await;

Message::UpdatePartial(server, kind, loaded)
}
Expand Down
Loading

0 comments on commit 07ba40f

Please sign in to comment.