Skip to content

Commit

Permalink
feat!(send queue): add a priority field to maintain ordering of sending
Browse files Browse the repository at this point in the history
Prior to this patch, the send queue would not maintain the ordering of
sending a media *then* a text, because it would push back a dependent
request graduating into a queued request.

The solution implemented here consists in adding a new priority column
to the send queue, defaulting to 0 for existing events, and use higher
priorities for the media uploads, so they're considered before other
requests.

A high priority is also used for aggregation events that are sent late,
so they're sent as soon as possible, before other subsequent events.
  • Loading branch information
bnjbvr committed Nov 14, 2024
1 parent 2872af2 commit c02d8ce
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 23 deletions.
78 changes: 72 additions & 6 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub trait StateStoreIntegrationTests {
async fn test_display_names_saving(&self);
/// Test operations with the send queue.
async fn test_send_queue(&self);
/// Test priority of operations with the send queue.
async fn test_send_queue_priority(&self);
/// Test operations related to send queue dependents.
async fn test_send_queue_dependents(&self);
/// Test saving/restoring server capabilities.
Expand Down Expand Up @@ -1212,7 +1214,7 @@ impl StateStoreIntegrationTests for DynStateStore {
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0.into()).await.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();

// Reading it will work.
let pending = self.load_send_queue_requests(room_id).await.unwrap();
Expand All @@ -1236,7 +1238,7 @@ impl StateStoreIntegrationTests for DynStateStore {
)
.unwrap();

self.save_send_queue_request(room_id, txn, event.into()).await.unwrap();
self.save_send_queue_request(room_id, txn, event.into(), 0).await.unwrap();
}

// Reading all the events should work.
Expand Down Expand Up @@ -1334,7 +1336,7 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
.unwrap();
self.save_send_queue_request(room_id2, txn.clone(), event.into()).await.unwrap();
self.save_send_queue_request(room_id2, txn.clone(), event.into(), 0).await.unwrap();
}

// Add and remove one event for room3.
Expand All @@ -1344,7 +1346,7 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
.unwrap();
self.save_send_queue_request(room_id3, txn.clone(), event.into()).await.unwrap();
self.save_send_queue_request(room_id3, txn.clone(), event.into(), 0).await.unwrap();

self.remove_send_queue_request(room_id3, &txn).await.unwrap();
}
Expand All @@ -1357,6 +1359,64 @@ impl StateStoreIntegrationTests for DynStateStore {
assert!(outstanding_rooms.iter().any(|room| room == room_id2));
}

async fn test_send_queue_priority(&self) {
let room_id = room_id!("!test_send_queue:localhost");

// No queued event in store at first.
let events = self.load_send_queue_requests(room_id).await.unwrap();
assert!(events.is_empty());

// Saving one request should work.
let low0_txn = TransactionId::new();
let ev0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())
.unwrap();
self.save_send_queue_request(room_id, low0_txn.clone(), ev0.into(), 2).await.unwrap();

// Saving one request with higher priority should work.
let high_txn = TransactionId::new();
let ev1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())
.unwrap();
self.save_send_queue_request(room_id, high_txn.clone(), ev1.into(), 10).await.unwrap();

// Saving another request with the low priority should work.
let low1_txn = TransactionId::new();
let ev2 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())
.unwrap();
self.save_send_queue_request(room_id, low1_txn.clone(), ev2.into(), 2).await.unwrap();

// The requests should be ordered from higher priority to lower, and when equal,
// should use the insertion order instead.
let pending = self.load_send_queue_requests(room_id).await.unwrap();

assert_eq!(pending.len(), 3);
{
assert_eq!(pending[0].transaction_id, high_txn);

let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "high");
}

{
assert_eq!(pending[1].transaction_id, low0_txn);

let deserialized = pending[1].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "low0");
}

{
assert_eq!(pending[2].transaction_id, low1_txn);

let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "low1");
}
}

async fn test_send_queue_dependents(&self) {
let room_id = room_id!("!test_send_queue_dependents:localhost");

Expand All @@ -1365,7 +1425,7 @@ impl StateStoreIntegrationTests for DynStateStore {
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0.into()).await.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();

// No dependents, to start with.
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
Expand Down Expand Up @@ -1427,7 +1487,7 @@ impl StateStoreIntegrationTests for DynStateStore {
let event1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
.unwrap();
self.save_send_queue_request(room_id, txn1.clone(), event1.into()).await.unwrap();
self.save_send_queue_request(room_id, txn1.clone(), event1.into(), 0).await.unwrap();

self.save_dependent_queued_request(
room_id,
Expand Down Expand Up @@ -1609,6 +1669,12 @@ macro_rules! statestore_integration_tests {
store.test_send_queue().await;
}

#[async_test]
async fn test_send_queue_priority() {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_send_queue_priority().await;
}

#[async_test]
async fn test_send_queue_dependents() {
let store = get_store().await.expect("creating store failed").into_state_store();
Expand Down
9 changes: 7 additions & 2 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,13 +807,14 @@ impl StateStore for MemoryStore {
room_id: &RoomId,
transaction_id: OwnedTransactionId,
kind: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.send_queue_events
.write()
.unwrap()
.entry(room_id.to_owned())
.or_default()
.push(QueuedRequest { kind, transaction_id, error: None });
.push(QueuedRequest { kind, transaction_id, error: None, priority });
Ok(())
}

Expand Down Expand Up @@ -867,7 +868,11 @@ impl StateStore for MemoryStore {
&self,
room_id: &RoomId,
) -> Result<Vec<QueuedRequest>, Self::Error> {
Ok(self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().clone())
let mut ret =
self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().clone();
// Inverted order of priority, use stable sort to keep insertion order.
ret.sort_by(|lhs, rhs| rhs.priority.cmp(&lhs.priority));
Ok(ret)
}

async fn update_send_queue_request_status(
Expand Down
6 changes: 6 additions & 0 deletions crates/matrix-sdk-base/src/store/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ pub struct QueuedRequest {
///
/// `None` if the request is in the queue, waiting to be sent.
pub error: Option<QueueWedgeError>,

/// At which priority should this be handled?
///
/// The bigger the value, the higher the priority at which this request
/// should be handled.
pub priority: usize,
}

impl QueuedRequest {
Expand Down
11 changes: 10 additions & 1 deletion crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ pub trait StateStore: AsyncTraitDeps {
room_id: &RoomId,
transaction_id: OwnedTransactionId,
request: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error>;

/// Updates a send queue request with the given content, and resets its
Expand Down Expand Up @@ -390,6 +391,10 @@ pub trait StateStore: AsyncTraitDeps {
) -> Result<bool, Self::Error>;

/// Loads all the send queue requests for the given room.
///
/// The resulting vector of queued requests should be ordered from higher
/// priority to lower priority, and respect the insertion order when
/// priorities are equal.
async fn load_send_queue_requests(
&self,
room_id: &RoomId,
Expand Down Expand Up @@ -641,8 +646,12 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
room_id: &RoomId,
transaction_id: OwnedTransactionId,
content: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.0.save_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
self.0
.save_send_queue_request(room_id, transaction_id, content, priority)
.await
.map_err(Into::into)
}

async fn update_send_queue_request(
Expand Down
14 changes: 12 additions & 2 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ struct PersistedQueuedRequest {

pub error: Option<QueueWedgeError>,

priority: Option<usize>,

// Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
/// Deprecated (from old format), now replaced with error field.
is_wedged: Option<bool>,
Expand All @@ -459,7 +461,10 @@ impl PersistedQueuedRequest {
_ => self.error,
};

Some(QueuedRequest { kind, transaction_id: self.transaction_id, error })
// By default, events without a priority have a priority of 0.
let priority = self.priority.unwrap_or(0);

Some(QueuedRequest { kind, transaction_id: self.transaction_id, error, priority })
}
}

Expand Down Expand Up @@ -1329,6 +1334,7 @@ impl_state_store!({
room_id: &RoomId,
transaction_id: OwnedTransactionId,
kind: QueuedRequestKind,
priority: usize,
) -> Result<()> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);

Expand Down Expand Up @@ -1357,6 +1363,7 @@ impl_state_store!({
error: None,
is_wedged: None,
event: None,
priority: Some(priority),
});

// Save the new vector into db.
Expand Down Expand Up @@ -1460,11 +1467,14 @@ impl_state_store!({
.get(&encoded_key)?
.await?;

let prev = prev.map_or_else(
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
)?;

// Inverted stable ordering on priority.
prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));

Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add a priority column, defaulting to 0 for all events in the send queue.
ALTER TABLE "send_queue_events"
ADD COLUMN "priority" INTEGER NOT NULL DEFAULT 0;
23 changes: 18 additions & 5 deletions crates/matrix-sdk-sqlite/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ mod keys {
/// This is used to figure whether the sqlite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`SqliteStateStore::run_migrations`] function..
const DATABASE_VERSION: u8 = 9;
const DATABASE_VERSION: u8 = 10;

/// A sqlite based cryptostore.
#[derive(Clone)]
Expand Down Expand Up @@ -307,6 +307,17 @@ impl SqliteStateStore {
.await?;
}

if from < 10 && to >= 10 {
conn.with_transaction(move |txn| {
// Run the migration.
txn.execute_batch(include_str!(
"../migrations/state_store/009_send_queue_priority.sql"
))?;
txn.set_db_version(10)
})
.await?;
}

Ok(())
}

Expand Down Expand Up @@ -1685,6 +1696,7 @@ impl StateStore for SqliteStateStore {
room_id: &RoomId,
transaction_id: OwnedTransactionId,
content: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
let room_id_value = self.serialize_value(&room_id.to_owned())?;
Expand All @@ -1699,7 +1711,7 @@ impl StateStore for SqliteStateStore {
self.acquire()
.await?
.with_transaction(move |txn| {
txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content) VALUES (?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content))?;
txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority) VALUES (?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority))?;
Ok(())
})
.await
Expand Down Expand Up @@ -1761,14 +1773,14 @@ impl StateStore for SqliteStateStore {
// Note: ROWID is always present and is an auto-incremented integer counter. We
// want to maintain the insertion order, so we can sort using it.
// Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
let res: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = self
let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize)> = self
.acquire()
.await?
.prepare(
"SELECT transaction_id, content, wedge_reason FROM send_queue_events WHERE room_id = ? ORDER BY ROWID",
"SELECT transaction_id, content, wedge_reason, priority FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
|mut stmt| {
stmt.query((room_id,))?
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)))
.collect()
},
)
Expand All @@ -1780,6 +1792,7 @@ impl StateStore for SqliteStateStore {
transaction_id: entry.0.into(),
kind: self.deserialize_json(&entry.1)?,
error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
priority: entry.3,
});
}

Expand Down
Loading

0 comments on commit c02d8ce

Please sign in to comment.