Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-design asc msg trigger ledger changes #4738

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 46 additions & 97 deletions massa-execution-worker/src/speculative_async_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
//! The speculative asynchronous pool represents the state of
//! the pool at an arbitrary execution slot.

use crate::active_history::{ActiveHistory, HistorySearchResult::Present};
use crate::active_history::{
ActiveHistory,
HistorySearchResult::{self, Present},
};
use massa_async_pool::{
AsyncMessage, AsyncMessageId, AsyncMessageInfo, AsyncMessageTrigger, AsyncMessageUpdate,
AsyncPoolChanges,
Expand Down Expand Up @@ -97,8 +100,7 @@ impl SpeculativeAsyncPool {
}

/// Takes a batch of asynchronous messages to execute,
/// removing them from the speculative asynchronous pool and settling their deletion from it
/// in the changes accumulator.
/// removing them from the speculative asynchronous pool and settling their deletion from it in the changes accumulator.
///
/// # Arguments
/// * `slot`: slot at which the batch is taken (allows filtering by validity interval)
Expand All @@ -123,14 +125,9 @@ impl SpeculativeAsyncPool {

for (message_id, message_info) in message_infos.iter() {
let corrected_max_gas = message_info.max_gas.saturating_add(async_msg_cst_gas_cost);
// Note: SecureShareOperation.get_validity_range(...) returns RangeInclusive
// so to be consistent here, use >= & <= checks
if available_gas >= corrected_max_gas
&& Self::is_message_ready_to_execute(
&slot,
&message_info.validity_start,
&message_info.validity_end,
)
&& slot >= message_info.validity_start
&& slot < message_info.validity_end
&& message_info.can_be_executed
{
available_gas -= corrected_max_gas;
Expand Down Expand Up @@ -164,26 +161,26 @@ impl SpeculativeAsyncPool {
) -> Vec<(AsyncMessageId, AsyncMessage)> {
// Update the messages_info: remove messages that should be removed
// Filter out all messages for which the validity end is expired.
// Note: that the validity_end bound is included in the validity interval of the message.
// Note that the validity_end bound is NOT included in the validity interval of the message.

let mut eliminated_infos = Vec::new();
self.message_infos.retain(|id, info| {
if Self::is_message_expired(slot, &info.validity_end) {
if *slot < info.validity_end {
true
} else {
eliminated_infos.push((*id, info.clone()));
false
} else {
true
}
});

let mut eliminated_new_messages = Vec::new();
self.pool_changes.0.retain(|k, v| match v {
SetUpdateOrDelete::Set(message) => {
if Self::is_message_expired(slot, &message.validity_end) {
if *slot < message.validity_end {
true
} else {
eliminated_new_messages.push((*k, v.clone()));
false
} else {
true
}
}
SetUpdateOrDelete::Update(_v) => true,
Expand All @@ -196,7 +193,7 @@ impl SpeculativeAsyncPool {
SetUpdateOrDelete::Delete => None,
}));

// Truncate message pool to its max size, removing non-priority items
// Truncate message pool to its max size, removing non-prioritary items
let excess_count = self
.message_infos
.len()
Expand All @@ -209,14 +206,17 @@ impl SpeculativeAsyncPool {

// Activate the messages that can be activated (triggered)
let mut triggered_info = Vec::new();
for (id, message_info) in self.message_infos.iter_mut() {
let mut message_infos = self.message_infos.clone();

for (id, message_info) in message_infos.iter_mut() {
if let Some(filter) = &message_info.trigger {
if is_triggered(filter, ledger_changes) {
if self.is_triggered(filter, ledger_changes) {
message_info.can_be_executed = true;
triggered_info.push((*id, message_info.clone()));
}
}
}
self.message_infos = message_infos;

// Query triggered messages
let triggered_msg =
Expand Down Expand Up @@ -317,82 +317,31 @@ impl SpeculativeAsyncPool {
msgs
}

/// Return true if a message (given its validity end) is expired
/// Must be consistent with is_message_valid
fn is_message_expired(slot: &Slot, message_validity_end: &Slot) -> bool {
// Note: SecureShareOperation.get_validity_range(...) returns RangeInclusive
// (for operation validity) so apply the same rule for message validity
*slot > *message_validity_end
}

/// Return true if a message (given its validity_start & validity end) is ready to execute
/// Must be consistent with is_message_expired
fn is_message_ready_to_execute(
slot: &Slot,
message_validity_start: &Slot,
message_validity_end: &Slot,
) -> bool {
// Note: SecureShareOperation.get_validity_range(...) returns RangeInclusive
// (for operation validity) so apply the same rule for message validity
slot >= message_validity_start && slot <= message_validity_end
}
}

/// Check in the ledger changes if a message trigger has been triggered
fn is_triggered(filter: &AsyncMessageTrigger, ledger_changes: &LedgerChanges) -> bool {
ledger_changes.has_changes(&filter.address, filter.datastore_key.clone())
}

#[cfg(test)]
mod tests {
use super::*;

// Test if is_message_expired & is_message_ready_to_execute are consistent
#[test]
fn test_validity() {
let slot1 = Slot::new(6, 0);
let slot2 = Slot::new(9, 0);
let slot_validity_start = Slot::new(4, 0);
let slot_validity_end = Slot::new(8, 0);

assert!(!SpeculativeAsyncPool::is_message_expired(
&slot1,
&slot_validity_end
));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(
&slot1,
&slot_validity_start,
&slot_validity_end
));

assert!(!SpeculativeAsyncPool::is_message_expired(
&slot_validity_start,
&slot_validity_end
));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(
&slot_validity_start,
&slot_validity_start,
&slot_validity_end
));

assert!(!SpeculativeAsyncPool::is_message_expired(
&slot_validity_end,
&slot_validity_end
));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(
&slot_validity_end,
&slot_validity_start,
&slot_validity_end
));

assert!(SpeculativeAsyncPool::is_message_expired(
&slot2,
&slot_validity_end
));
assert!(!SpeculativeAsyncPool::is_message_ready_to_execute(
&slot2,
&slot_validity_start,
&slot_validity_end
));
/// Check in the ledger changes if a message trigger has been triggered
fn is_triggered(&self, filter: &AsyncMessageTrigger, ledger_changes: &LedgerChanges) -> bool {
let addr = filter.address;
let key = filter.datastore_key.clone();

match key {
Some(key) => {
let activ_val = self
.active_history
.read()
.fetch_active_history_data_entry(&addr, &key);

let latest_val = match activ_val {
HistorySearchResult::Present(val) => Some(val),
HistorySearchResult::Absent => None,
HistorySearchResult::NoInfo => self
.final_state
.read()
.get_ledger()
.get_data_entry(&addr, &key),
};

ledger_changes.has_changes(&addr, Some(key), latest_val)
}
None => ledger_changes.has_changes(&addr, None, None),
}
}
}
46 changes: 37 additions & 9 deletions massa-ledger-exports/src/ledger_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,21 +844,49 @@ impl LedgerChanges {
///
/// # Returns
/// * true if the address and, optionally the datastore key, exists in the ledger changes
pub fn has_changes(&self, addr: &Address, key: Option<Vec<u8>>) -> bool {
pub fn has_changes(
&self,
addr: &Address,
key: Option<Vec<u8>>,
prev_val: Option<Vec<u8>>,
) -> bool {
// Get the current changes being applied to the ledger entry associated to that address
match self.0.get(addr) {
// This ledger entry is being replaced by a new one:
// check if the new ledger entry has a datastore entry for the provided key
Some(SetUpdateOrDelete::Set(v)) => key.map_or(true, |k| v.datastore.contains_key(&k)),

// This ledger entry is being replaced by a new one
// The address may have been deleted in the same slot, in any case the address has changed
Some(SetUpdateOrDelete::Set(entry)) => {
match key {
// If the key is present, check if the new entry has the key in its datastore
Some(k) => match prev_val {
// If the previous value is present, check if it's an update
Some(pv) => entry.datastore.get(&k).map_or(false, |v| v == &pv),
// If the previous value is not present, the address has changed
None => false,
},
// If the key is not present, the address has changed
None => true,
}
}
// This ledger entry is being updated
Some(SetUpdateOrDelete::Update(LedgerEntryUpdate { datastore, .. })) => {
// Check if the update being applied to that datastore entry
key.map_or(true, |k| datastore.contains_key(&k))
// Check if the update being applied to that datastore entry, and compare to the previous value
key.map_or(true, |k| match datastore.get(&k) {
Some(val) => match (val, prev_val) {
(SetOrDelete::Set(_), None) => true,
(SetOrDelete::Set(v), Some(pv)) => *v != pv,
(SetOrDelete::Delete, None) => false,
(SetOrDelete::Delete, Some(_)) => true,
},
None => false,
})
}

// This ledger entry is being deleted: return true
Some(SetUpdateOrDelete::Delete) => true,
// This ledger entry is being deleted
Some(SetUpdateOrDelete::Delete) => {
// If the key is set and it has no previous value, it has not changed
// All other cases should return true
!(key.is_some() && prev_val.is_none())
}

// This ledger entry is not being changed.
None => false,
Expand Down
Loading