Skip to content

Commit

Permalink
feat: implement missing filter parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
aoudiamoncef committed Oct 9, 2023
1 parent b9a93dc commit 8562f5f
Showing 1 changed file with 83 additions and 41 deletions.
124 changes: 83 additions & 41 deletions massa-grpc/src/stream/new_slot_execution_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ use crate::server::MassaPublicGrpc;
use crate::SlotRange;
use futures_util::StreamExt;
use massa_execution_exports::{ExecutionOutput, SlotExecutionOutput};
use massa_models::address::Address;
use massa_models::operation::OperationId;
use massa_models::slot::Slot;
use massa_proto_rs::massa::api::v1::{self as grpc_api, NewSlotExecutionOutputsRequest};
use massa_proto_rs::massa::model::v1::{self as grpc_model};
use std::collections::HashSet;
use std::io::ErrorKind;
use std::pin::Pin;
use std::str::FromStr;
use tokio::select;
use tonic::{Request, Streaming};
use tracing::log::{error, warn};
Expand All @@ -26,7 +29,6 @@ pub type NewSlotExecutionOutputsStreamType = Pin<
>,
>;

//TODO implement remaining sub filters
// Type declaration for NewSlotExecutionOutputsFilter
#[derive(Clone, Debug, Default)]
struct Filter {
Expand All @@ -50,6 +52,17 @@ struct Filter {
struct AsyncPoolChangesFilter {
// Do not return any message
none: Option<()>,
// The type of the change
change_type: Option<i32>,
// The handler function name within the destination address bytecode
handler: Option<String>,
// The address towards which the message is being sent
destination_address: Option<Address>,
// The address that sent the message
emitter_address: Option<Address>,
// Boolean that determine if the message can be executed. For messages without filter this boolean is always true.
// For messages with filter, this boolean is true if the filter has been matched between `validity_start` and current slot.
can_be_executed: Option<bool>,
}

#[derive(Clone, Debug, Default)]
Expand All @@ -62,18 +75,30 @@ struct ExecutedDenounciationFilter {
struct ExecutionEventFilter {
// Do not return any message
none: Option<()>,
// Caller address
caller_address: Option<Address>,
// Emitter address
emitter_address: Option<Address>,
// Original operation id
original_operation_id: Option<OperationId>,
// Whether the event is a failure
is_failure: Option<bool>,
}

#[derive(Clone, Debug, Default)]
struct ExecutedOpsChangesFilter {
// Do not return any message
none: Option<()>,
// Operation id
operation_id: Option<OperationId>,
}

#[derive(Clone, Debug, Default)]
struct LedgerChangesFilter {
// Do not return any message
none: Option<()>,
// Address for which we have ledger changes
address: Option<Address>,
}

/// Creates a new stream of new produced and received slot execution outputs
Expand Down Expand Up @@ -240,75 +265,92 @@ fn get_filter(
},
grpc_api::new_slot_execution_outputs_filter::Filter::AsyncPoolChangesFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = AsyncPoolChangesFilter::default();
match filter {
grpc_api::async_pool_changes_filter::Filter::None(_) => {
async_pool_changes_filter = Some(AsyncPoolChangesFilter {
none: Some(()),
});
nested_filter.none = Some(());
},
grpc_api::async_pool_changes_filter::Filter::Type(change_type) => {
nested_filter.change_type = Some(change_type);
},
grpc_api::async_pool_changes_filter::Filter::Handler(function) => {
nested_filter.handler = Some(function);
},
grpc_api::async_pool_changes_filter::Filter::DestinationAddress(addr) => {
nested_filter.destination_address = Some(Address::from_str(&addr)?);
},
grpc_api::async_pool_changes_filter::Filter::EmitterAddress(addr) => {
nested_filter.emitter_address = Some(Address::from_str(&addr)?);
},
grpc_api::async_pool_changes_filter::Filter::CanBeExecuted(can_be_executed) => {
nested_filter.can_be_executed = Some(can_be_executed);
},
_ => {
async_pool_changes_filter = Some(AsyncPoolChangesFilter {
none: None,
})
}
async_pool_changes_filter = Some(nested_filter);
}
}
},
},
grpc_api::new_slot_execution_outputs_filter::Filter::ExecutedDenounciationFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = ExecutedDenounciationFilter::default();
match filter {
grpc_api::executed_denounciation_filter::Filter::None(_) => {
executed_denounciation_filter = Some(ExecutedDenounciationFilter {
none: Some(()),
});
nested_filter.none = Some(());
},
}
executed_denounciation_filter = Some(nested_filter);
}
}},
},
grpc_api::new_slot_execution_outputs_filter::Filter::EventFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = ExecutionEventFilter::default();
match filter {
grpc_api::execution_event_filter::Filter::None(_) => {
execution_event_filter = Some(ExecutionEventFilter {
none: Some(()),
});
nested_filter.none = Some(());
},
grpc_api::execution_event_filter::Filter::CallerAddress(addr) => {
nested_filter.caller_address = Some(Address::from_str(&addr)?);
},
grpc_api::execution_event_filter::Filter::EmitterAddress(addr) => {
nested_filter.emitter_address = Some(Address::from_str(&addr)?);
},
grpc_api::execution_event_filter::Filter::OriginalOperationId(op) => {
nested_filter.original_operation_id = Some(OperationId::from_str(&op)?);
},
grpc_api::execution_event_filter::Filter::IsFailure(is_failure) => {
nested_filter.is_failure = Some(is_failure);
},
_ => {
execution_event_filter = Some(ExecutionEventFilter {
none: None,
})
}
}
}},
execution_event_filter = Some(nested_filter);
}
},
grpc_api::new_slot_execution_outputs_filter::Filter::ExecutedOpsChangesFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = ExecutedOpsChangesFilter::default();
match filter {
grpc_api::executed_ops_changes_filter::Filter::None(_) => {
executed_ops_changes_filter = Some(ExecutedOpsChangesFilter {
none: Some(()),
});
nested_filter.none = Some(());
},
grpc_api::executed_ops_changes_filter::Filter::OperationId(op_id) => {
nested_filter.operation_id = Some(OperationId::from_str(&op_id)?);
},
_ => {
executed_ops_changes_filter = Some(ExecutedOpsChangesFilter {
none: None,
})
}
}
}},
executed_ops_changes_filter = Some(nested_filter);
}
},
grpc_api::new_slot_execution_outputs_filter::Filter::LedgerChangesFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = LedgerChangesFilter::default();
match filter {
grpc_api::ledger_changes_filter::Filter::None(_) => {
ledger_changes_filter = Some(LedgerChangesFilter {
none: Some(()),
});
nested_filter.none = Some(());
},
grpc_api::ledger_changes_filter::Filter::Address(addr) => {
nested_filter.address = Some(Address::from_str(&addr)?);
},
_ => {
ledger_changes_filter = Some(LedgerChangesFilter {
none: None,
})
}
}
}},
ledger_changes_filter = Some(nested_filter);
}
},
}
}
}
Expand Down

0 comments on commit 8562f5f

Please sign in to comment.