Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skip reprocessing event if already seen; also bunch of small optimizations #20

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 89 additions & 94 deletions src/aws_utils/cloudwatch_utils.rs
Original file line number Diff line number Diff line change
@@ -1,120 +1,115 @@
pub mod cloudwatch_utils {

/// Constants for dimension names and values
pub const EXECUTOR_DIMENSION: &str = "Executor";
pub const PRIORITY_EXECUTOR: &str = "PriorityExecutor";
pub const V2_EXECUTOR: &str = "V2Executor";

/// Constants for metric names
pub const TX_SUCCEEDED_METRIC: &str = "TransactionSucceeded";
pub const TX_REVERTED_METRIC: &str = "TransactionReverted";
pub const TX_SUBMITTED_METRIC: &str = "TransactionSubmitted";
pub const TX_STATUS_UNKNOWN_METRIC: &str = "TransactionStatusUnknown";

pub enum DimensionName {
Executor,
}
impl AsRef<str> for DimensionName {
fn as_ref(&self) -> &str {
match self {
DimensionName::Executor => PRIORITY_EXECUTOR,
}
/// Constants for dimension names and values
pub const EXECUTOR_DIMENSION: &str = "Executor";
pub const PRIORITY_EXECUTOR: &str = "PriorityExecutor";
pub const V2_EXECUTOR: &str = "V2Executor";

/// Constants for metric names
pub const TX_SUCCEEDED_METRIC: &str = "TransactionSucceeded";
pub const TX_REVERTED_METRIC: &str = "TransactionReverted";
pub const TX_SUBMITTED_METRIC: &str = "TransactionSubmitted";
pub const TX_STATUS_UNKNOWN_METRIC: &str = "TransactionStatusUnknown";

pub enum DimensionName {
Executor,
}
impl AsRef<str> for DimensionName {
fn as_ref(&self) -> &str {
match self {
DimensionName::Executor => PRIORITY_EXECUTOR,
}
}
}

impl Into<String> for DimensionName {
fn into(self) -> String {
match self {
DimensionName::Executor => EXECUTOR_DIMENSION.to_string(),
}
impl From<DimensionName> for String {
fn from(dimension: DimensionName) -> Self {
match dimension {
DimensionName::Executor => EXECUTOR_DIMENSION.to_string(),
Comment on lines +23 to +26
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl From gives you Into for free, the reverse not true

}
}
}

pub enum DimensionValue {
PriorityExecutor,
V2Executor,
}

impl Into<String> for DimensionValue {
fn into(self) -> String {
match self {
DimensionValue::PriorityExecutor => PRIORITY_EXECUTOR.to_string(),
DimensionValue::V2Executor => V2_EXECUTOR.to_string(),
}
pub enum DimensionValue {
PriorityExecutor,
V2Executor,
}
impl From<DimensionValue> for String {
fn from(value: DimensionValue) -> Self {
match value {
DimensionValue::PriorityExecutor => PRIORITY_EXECUTOR.to_string(),
DimensionValue::V2Executor => V2_EXECUTOR.to_string(),
}
}
}

impl AsRef<str> for DimensionValue {
fn as_ref(&self) -> &str {
match self {
DimensionValue::PriorityExecutor => PRIORITY_EXECUTOR,
DimensionValue::V2Executor => V2_EXECUTOR,
}
impl AsRef<str> for DimensionValue {
fn as_ref(&self) -> &str {
match self {
DimensionValue::PriorityExecutor => PRIORITY_EXECUTOR,
DimensionValue::V2Executor => V2_EXECUTOR,
}
}
}

pub enum CwMetrics {
TxSucceeded,
TxReverted,
TxSubmitted,
TxStatusUnknown,
}

impl Into<String> for CwMetrics {
fn into(self) -> String {
match self {
CwMetrics::TxSucceeded => TX_SUCCEEDED_METRIC.to_string(),
CwMetrics::TxReverted => TX_REVERTED_METRIC.to_string(),
CwMetrics::TxSubmitted => TX_SUBMITTED_METRIC.to_string(),
CwMetrics::TxStatusUnknown => TX_STATUS_UNKNOWN_METRIC.to_string(),
}
pub enum CwMetrics {
TxSucceeded,
TxReverted,
TxSubmitted,
TxStatusUnknown,
}
impl From<CwMetrics> for String {
fn from(metric: CwMetrics) -> Self {
match metric {
CwMetrics::TxSucceeded => TX_SUCCEEDED_METRIC.to_string(),
CwMetrics::TxReverted => TX_REVERTED_METRIC.to_string(),
CwMetrics::TxSubmitted => TX_SUBMITTED_METRIC.to_string(),
CwMetrics::TxStatusUnknown => TX_STATUS_UNKNOWN_METRIC.to_string(),
}
}
}

use aws_sdk_cloudwatch::types::Dimension;
use aws_sdk_cloudwatch::types::Dimension;

pub const ARTEMIS_NAMESPACE: &str = "Artemis";
pub const ARTEMIS_NAMESPACE: &str = "Artemis";

pub struct MetricBuilder {
metric_name: String,
dimensions: Vec<Dimension>,
value: f64,
}
pub struct MetricBuilder {
metric_name: String,
dimensions: Vec<Dimension>,
value: f64,
}

impl MetricBuilder {
pub fn new(metric: CwMetrics) -> Self {
Self {
metric_name: metric.into(),
dimensions: Vec::new(),
value: 1.0,
}
impl MetricBuilder {
pub fn new(metric: CwMetrics) -> Self {
Self {
metric_name: metric.into(),
dimensions: Vec::new(),
value: 1.0,
}
}

pub fn add_dimension(mut self, name: &str, value: &str) -> Self {
self.dimensions
.push(Dimension::builder().name(name).value(value).build());
self
}
pub fn add_dimension(mut self, name: &str, value: &str) -> Self {
self.dimensions
.push(Dimension::builder().name(name).value(value).build());
self
}

pub fn with_value(mut self, value: f64) -> Self {
self.value = value;
self
}
pub fn with_value(mut self, value: f64) -> Self {
self.value = value;
self
}

pub fn build(self) -> aws_sdk_cloudwatch::types::MetricDatum {
aws_sdk_cloudwatch::types::MetricDatum::builder()
.metric_name(self.metric_name)
.value(self.value)
.set_dimensions(Some(self.dimensions))
.build()
}
pub fn build(self) -> aws_sdk_cloudwatch::types::MetricDatum {
aws_sdk_cloudwatch::types::MetricDatum::builder()
.metric_name(self.metric_name)
.value(self.value)
.set_dimensions(Some(self.dimensions))
.build()
}
}

pub fn receipt_status_to_metric(status: u64) -> CwMetrics {
match status {
1 => CwMetrics::TxSucceeded,
0 => CwMetrics::TxReverted,
_ => CwMetrics::TxStatusUnknown,
}
pub fn receipt_status_to_metric(status: u64) -> CwMetrics {
match status {
1 => CwMetrics::TxSucceeded,
0 => CwMetrics::TxReverted,
_ => CwMetrics::TxStatusUnknown,
}
}
21 changes: 7 additions & 14 deletions src/collectors/uniswapx_order_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ impl fmt::Display for OrderTypeError {

impl std::error::Error for OrderTypeError {}

#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub enum OrderType {
DutchV2,
#[default]
Priority,
}

Expand All @@ -44,21 +45,15 @@ impl FromStr for OrderType {
}
}

impl ToString for OrderType {
fn to_string(&self) -> String {
impl fmt::Display for OrderType {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl Display gives you ToString for free

fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OrderType::DutchV2 => "Dutch_V2".to_string(),
OrderType::Priority => "Priority".to_string(),
OrderType::DutchV2 => write!(f, "Dutch_V2"),
OrderType::Priority => write!(f, "Priority"),
}
}
}

impl Default for OrderType {
fn default() -> Self {
OrderType::DutchV2
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct UniswapXOrder {
#[serde(rename = "encodedOrder")]
Expand Down Expand Up @@ -109,9 +104,7 @@ impl Collector<UniswapXOrder> for UniswapXOrderCollector {
async fn get_event_stream(&self) -> Result<CollectorStream<'_, UniswapXOrder>> {
let url = format!(
"{}/orders?orderStatus=open&chainId={}&orderType={}",
self.base_url,
self.chain_id,
self.order_type.to_string()
self.base_url, self.chain_id, self.order_type,
);

// stream that polls the UniswapX API every 5 seconds
Expand Down
2 changes: 1 addition & 1 deletion src/collectors/uniswapx_route_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,5 @@ fn resolve_address(token: String) -> String {
if token == "0x0000000000000000000000000000000000000000" {
return "ETH".to_string();
}
return token;
token
}
2 changes: 1 addition & 1 deletion src/executors/public_1559_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ethers::{
};

use crate::{
aws_utils::cloudwatch_utils::cloudwatch_utils::{
aws_utils::cloudwatch_utils::{
receipt_status_to_metric, CwMetrics, DimensionName, DimensionValue, MetricBuilder,
ARTEMIS_NAMESPACE,
},
Expand Down
4 changes: 4 additions & 0 deletions src/strategies/keystore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ impl KeyStore {
pub fn len(&self) -> usize {
self.keys.len()
}

pub fn is_empty(&self) -> bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub fn len() and pub fn is_empty() often go hand-in-hand to provide the same interface as std types

self.keys.is_empty()
}
}

#[cfg(test)]
Expand Down
19 changes: 10 additions & 9 deletions src/strategies/priority_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ pub struct ExecutionMetadata {
}

impl ExecutionMetadata {
pub fn new(quote: U256, amount_out_required: U256, order_hash: &String) -> Self {
pub fn new(quote: U256, amount_out_required: U256, order_hash: &str) -> Self {
Self {
quote,
amount_out_required,
order_hash: order_hash.clone(),
order_hash: order_hash.to_owned(),
}
}

Expand Down Expand Up @@ -160,18 +160,19 @@ impl<M: Middleware + 'static> UniswapXPriorityFill<M> {
}

fn decode_order(&self, encoded_order: &str) -> Result<PriorityOrder, Box<dyn Error>> {
let encoded_order = if encoded_order.starts_with("0x") {
&encoded_order[2..]
let encoded_order = if let Some(stripped) = encoded_order.strip_prefix("0x") {
stripped
} else {
encoded_order
};
let order_hex = hex::decode(encoded_order)?;

Ok(PriorityOrder::decode_inner(&order_hex, false)?)
PriorityOrder::decode_inner(&order_hex, false)
}

async fn process_order_event(&mut self, event: &UniswapXOrder) -> Option<Action> {
if self.last_block_timestamp == 0 {
if self.last_block_timestamp == 0 || self.open_orders.get(&event.order_hash).is_some() {
info!("{} - skipping processing order event", event.order_hash);
return None;
}

Expand Down Expand Up @@ -210,7 +211,7 @@ impl<M: Middleware + 'static> UniswapXPriorityFill<M> {
..
} = &event.request;

if let Some(metadata) = self.get_execution_metadata(&event) {
if let Some(metadata) = self.get_execution_metadata(event) {
info!(
"{} - Sending trade: num trades: {} routed quote: {}, batch needs: {}",
metadata.order_hash,
Expand Down Expand Up @@ -373,7 +374,7 @@ impl<M: Middleware + 'static> UniswapXPriorityFill<M> {
info!("{} - Order already done, skipping", order_hash);
return;
}
if let Some(_) = self.get_open_order(&order_hash) {
if self.get_open_order(&order_hash).is_some() {
info!("{} - updating order", order_hash);
self.update_open_order(&order_hash, |existing_order| {
existing_order.resolved = resolved_order;
Expand Down Expand Up @@ -434,7 +435,7 @@ impl<M: Middleware + 'static> UniswapXPriorityFill<M> {
}
}

async fn send_order_if_open(&self, order_hash: &String) -> Result<()> {
async fn send_order_if_open(&self, order_hash: &str) -> Result<()> {
if let Some(order_data) = self.get_open_order(order_hash) {
let order_batch = self.get_order_batch(&order_data);
self.batch_sender.send(vec![order_batch]).await?;
Expand Down
Loading