Skip to content

Commit

Permalink
chore(broker): correct function names
Browse files Browse the repository at this point in the history
  • Loading branch information
GMKrieger committed Aug 13, 2024
1 parent 01f6cad commit 2731d76
Show file tree
Hide file tree
Showing 18 changed files with 393 additions and 439 deletions.
92 changes: 41 additions & 51 deletions offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ pub enum BrokerFacadeError {
))]
InvalidIndexes { expected: u128, got: u128 },

#[snafu(display("failed to consume input event"))]
ConsumeError { source: BrokerError },

#[snafu(display(
"failed to find finish epoch input event epoch={}",
epoch
Expand Down Expand Up @@ -73,13 +70,16 @@ impl BrokerFacade {
})
}

/// Consume rollups input event
/// Read rollups input event
#[tracing::instrument(level = "trace", skip_all)]
pub async fn consume_input(&mut self) -> Result<RollupsInput> {
tracing::trace!(self.last_id, "consuming rollups input event");
pub async fn read_input(&mut self) -> Result<RollupsInput> {
tracing::trace!(
self.last_id,
"reading rollups input event from stream"
);
let event = self
.client
.consume_blocking(&self.inputs_stream, &self.last_id)
.read_blocking(&self.inputs_stream, &self.last_id)
.await
.context(BrokerInternalSnafu)?;
if event.payload.parent_id != self.last_id {
Expand All @@ -93,9 +93,9 @@ impl BrokerFacade {
}
}

/// Produce the rollups claim if it isn't in the stream yet
/// Add the rollups claim on redis if it isn't in the stream yet
#[tracing::instrument(level = "trace", skip_all)]
pub async fn produce_rollups_claim(
pub async fn add_rollups_claim(
&mut self,
rollups_claim: RollupsClaim,
) -> Result<()> {
Expand All @@ -105,26 +105,26 @@ impl BrokerFacade {

tracing::trace!(rollups_claim.epoch_index,
?rollups_claim.epoch_hash,
"producing rollups claim"
"adding rollups claim to stream"
);

let last_claim_event = self
.client
.peek_latest(&self.claims_stream)
.read_latest(&self.claims_stream)
.await
.context(BrokerInternalSnafu)?;

let should_enqueue_claim = match last_claim_event {
let should_add_claim = match last_claim_event {
Some(event) => {
let last_claim = event.payload;
tracing::trace!(?last_claim, "got last claim from Redis");
let should_enqueue_claim =
let should_add_claim =
rollups_claim.epoch_index > last_claim.epoch_index;

// If this happens, then something is wrong with the dispatcher.
let invalid_indexes =
rollups_claim.first_index != last_claim.last_index + 1;
if should_enqueue_claim && invalid_indexes {
if should_add_claim && invalid_indexes {
tracing::debug!("rollups_claim.first_index = {}, last_claim.last_index = {}",
rollups_claim.first_index, last_claim.last_index);
return Err(BrokerFacadeError::InvalidIndexes {
Expand All @@ -133,35 +133,35 @@ impl BrokerFacade {
});
};

should_enqueue_claim
should_add_claim
}
None => {
tracing::trace!("no claims in the stream");
true
}
};

if should_enqueue_claim {
if should_add_claim {
self.client
.produce(&self.claims_stream, rollups_claim)
.add(&self.claims_stream, rollups_claim)
.await
.context(BrokerInternalSnafu)?;
}

Ok(())
}

/// Produce outputs to the rollups-outputs stream
/// Add outputs to the rollups-outputs stream
#[tracing::instrument(level = "trace", skip_all)]
pub async fn produce_outputs(
pub async fn add_outputs(
&mut self,
outputs: Vec<RollupsOutput>,
) -> Result<()> {
tracing::trace!(?outputs, "producing rollups outputs");
tracing::trace!(?outputs, "adding rollups outputs to stream");

for output in outputs {
self.client
.produce(&self.outputs_stream, output)
.add(&self.outputs_stream, output)
.await
.context(BrokerInternalSnafu)?;
}
Expand Down Expand Up @@ -196,7 +196,7 @@ mod tests {
};
let config = BrokerConfig {
redis_endpoint: fixture.redis_endpoint().to_owned(),
consume_timeout: 10,
read_timeout: 10,
backoff,
};
let facade = BrokerFacade::new(config, dapp_metadata, false)
Expand All @@ -207,7 +207,7 @@ mod tests {
}

#[test_log::test(tokio::test)]
async fn test_it_consumes_inputs() {
async fn test_it_reads_inputs() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let inputs = vec![
Expand All @@ -233,10 +233,10 @@ mod tests {
];
let mut ids = Vec::new();
for input in inputs.iter() {
ids.push(state.fixture.produce_input_event(input.clone()).await);
ids.push(state.fixture.add_input_event(input.clone()).await);
}
assert_eq!(
state.facade.consume_input().await.unwrap(),
state.facade.read_input().await.unwrap(),
RollupsInput {
parent_id: INITIAL_ID.to_owned(),
epoch_index: 0,
Expand All @@ -245,7 +245,7 @@ mod tests {
},
);
assert_eq!(
state.facade.consume_input().await.unwrap(),
state.facade.read_input().await.unwrap(),
RollupsInput {
parent_id: ids[0].clone(),
epoch_index: 0,
Expand All @@ -254,7 +254,7 @@ mod tests {
},
);
assert_eq!(
state.facade.consume_input().await.unwrap(),
state.facade.read_input().await.unwrap(),
RollupsInput {
parent_id: ids[1].clone(),
epoch_index: 1,
Expand All @@ -265,7 +265,7 @@ mod tests {
}

#[test_log::test(tokio::test)]
async fn test_it_does_not_produce_claim_when_it_was_already_produced() {
async fn test_it_does_not_add_claim_when_it_was_already_added() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let rollups_claim = RollupsClaim {
Expand All @@ -275,23 +275,17 @@ mod tests {
first_index: 0,
last_index: 6,
};
state
.fixture
.produce_rollups_claim(rollups_claim.clone())
.await;
state.fixture.add_rollups_claim(rollups_claim.clone()).await;
state
.facade
.produce_rollups_claim(rollups_claim.clone())
.add_rollups_claim(rollups_claim.clone())
.await
.unwrap();
assert_eq!(
state.fixture.consume_all_claims().await,
vec![rollups_claim]
);
assert_eq!(state.fixture.read_all_claims().await, vec![rollups_claim]);
}

#[test_log::test(tokio::test)]
async fn test_it_produces_claims() {
async fn test_it_adds_claims() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let rollups_claim0 = RollupsClaim {
Expand All @@ -310,16 +304,16 @@ mod tests {
};
state
.facade
.produce_rollups_claim(rollups_claim0.clone())
.add_rollups_claim(rollups_claim0.clone())
.await
.unwrap();
state
.facade
.produce_rollups_claim(rollups_claim1.clone())
.add_rollups_claim(rollups_claim1.clone())
.await
.unwrap();
assert_eq!(
state.fixture.consume_all_claims().await,
state.fixture.read_all_claims().await,
vec![rollups_claim0, rollups_claim1]
);
}
Expand All @@ -344,12 +338,10 @@ mod tests {
};
state
.fixture
.produce_rollups_claim(rollups_claim1.clone())
.await;
let result = state
.facade
.produce_rollups_claim(rollups_claim2.clone())
.add_rollups_claim(rollups_claim1.clone())
.await;
let result =
state.facade.add_rollups_claim(rollups_claim2.clone()).await;
assert!(result.is_err());
assert_eq!(
BrokerFacadeError::InvalidIndexes {
Expand Down Expand Up @@ -381,12 +373,10 @@ mod tests {
};
state
.fixture
.produce_rollups_claim(rollups_claim1.clone())
.await;
let result = state
.facade
.produce_rollups_claim(rollups_claim2.clone())
.add_rollups_claim(rollups_claim1.clone())
.await;
let result =
state.facade.add_rollups_claim(rollups_claim2.clone()).await;
assert!(result.is_err());
assert_eq!(
BrokerFacadeError::InvalidIndexes {
Expand Down
45 changes: 21 additions & 24 deletions offchain/advance-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ pub enum RunnerError {
#[snafu(display("failed to find finish epoch input event"))]
FindFinishEpochInputError { source: BrokerFacadeError },

#[snafu(display("failed to consume input from broker"))]
ConsumeInputError { source: BrokerFacadeError },
#[snafu(display("failed to read input from broker"))]
ReadInputError { source: BrokerFacadeError },

#[snafu(display("failed to get whether claim was produced"))]
#[snafu(display("failed to get whether claim was added to stream"))]
PeekClaimError { source: BrokerFacadeError },

#[snafu(display("failed to produce claim in broker"))]
ProduceClaimError { source: BrokerFacadeError },
#[snafu(display("failed to add claim to stream in broker"))]
AddClaimError { source: BrokerFacadeError },

#[snafu(display("failed to produce outputs in broker"))]
ProduceOutputsError { source: BrokerFacadeError },
#[snafu(display("failed to add outputs to stream in broker"))]
AddOutputsError { source: BrokerFacadeError },
}

type Result<T> = std::result::Result<T, RunnerError>;
Expand All @@ -54,12 +54,9 @@ impl Runner {

tracing::info!("starting runner main loop");
loop {
let event = runner
.broker
.consume_input()
.await
.context(ConsumeInputSnafu)?;
tracing::info!(?event, "consumed input event");
let event =
runner.broker.read_input().await.context(ReadInputSnafu)?;
tracing::info!(?event, "read input event");

match event.data {
RollupsData::AdvanceStateInput(input) => {
Expand Down Expand Up @@ -88,7 +85,7 @@ impl Runner {
input_metadata: InputMetadata,
input_payload: Vec<u8>,
) -> Result<()> {
tracing::trace!("handling advance state");
tracing::info!("handling advance state");

let input_index = inputs_sent_count - 1;
let outputs = self
Expand All @@ -104,34 +101,34 @@ impl Runner {
tracing::trace!("advance state sent to server-manager");

self.broker
.produce_outputs(outputs)
.add_outputs(outputs)
.await
.context(ProduceOutputsSnafu)?;
tracing::trace!("produced outputs in broker");
.context(AddOutputsSnafu)?;
tracing::trace!("added outputs to stream in broker");

Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
async fn handle_finish(&mut self, epoch_index: u64) -> Result<()> {
tracing::trace!("handling finish");
tracing::info!("handling finish epoch");

let result = self.server_manager.finish_epoch(epoch_index).await;
tracing::trace!("finished epoch in server-manager");

match result {
Ok((rollups_claim, proofs)) => {
self.broker
.produce_outputs(proofs)
.add_outputs(proofs)
.await
.context(ProduceOutputsSnafu)?;
tracing::trace!("produced outputs in broker");
.context(AddOutputsSnafu)?;
tracing::trace!("added outputs to broker stream");

self.broker
.produce_rollups_claim(rollups_claim)
.add_rollups_claim(rollups_claim)
.await
.context(ProduceClaimSnafu)?;
tracing::info!("produced epoch claim");
.context(AddClaimSnafu)?;
tracing::info!("added epoch claim to broker stream");
}
Err(source) => {
if let ServerManagerError::EmptyEpochError { .. } = source {
Expand Down
2 changes: 1 addition & 1 deletion offchain/advance-runner/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl AdvanceRunnerFixture {

let broker_config = BrokerConfig {
redis_endpoint,
consume_timeout: 100,
read_timeout: 100,
backoff: Default::default(),
};

Expand Down
Loading

0 comments on commit 2731d76

Please sign in to comment.