Skip to content

Commit

Permalink
Observe challenge period for source chain when accumulating fees (#318)
Browse files Browse the repository at this point in the history
Co-authored-by: Seun Lanlege <[email protected]>
  • Loading branch information
Wizdave97 and seunlanlege authored Sep 30, 2024
1 parent d914c77 commit 95d9be5
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 68 deletions.
9 changes: 6 additions & 3 deletions modules/hyperclient/hyperclient.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ export interface MessageDispatched {
Dispatched: bigint
}

// The possible initial states of a timeout (Post request or response) stream
// The possible initial states of a timeout (request or response) stream
export type TimeoutStreamState = "Pending" | DestinationFinalizedState | HyperbridgeVerifiedState | HyperbridgeFinalizedState;

// The possible initial states of a message status (Post request or response) stream
// The possible initial states of a message status (request or response) stream
export type MessageStatusStreamState = MessageDispatched | SourceFinalizedState | HyperbridgeVerifiedState | HyperbridgeFinalizedState;

// The possible states of an inflight request
Expand Down Expand Up @@ -337,12 +337,15 @@ export class HyperClient {
): Promise<ReadableStream<MessageStatusWithMeta>>;

/**
* Return the status of a get request as a `ReadableStream`
* Return the status of a get request as a `ReadableStream`. If the stream terminates abruptly,
* perhaps as a result of some error, it can be resumed given some initial state.
* @param {IGetRequest} request
* @param {MessageStatusStreamState} state
* @returns {Promise<ReadableStream<MessageStatusWithMeta>>}
*/
get_request_status_stream(
request: IGetRequest,
state: MessageStatusStreamState,
): Promise<ReadableStream<MessageStatusWithMeta>>;

/**
Expand Down
2 changes: 1 addition & 1 deletion modules/hyperclient/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@polytope-labs/hyperclient",
"description": "The hyperclient is a library for managing (in-flight) ISMP requests",
"version": "0.6.2",
"version": "0.6.5",
"author": "Polytope Labs ([email protected])",
"license": "Apache-2.0",
"bugs": {
Expand Down
126 changes: 64 additions & 62 deletions modules/hyperclient/src/internals/post_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,74 +658,76 @@ pub async fn timeout_post_request_stream(
let lambda = || async {
match state {
TimeoutStreamState::Pending => {
let relayer = hyperbridge_client.query_request_receipt(hash).await?;
if relayer != H160::zero() {
let height = hyperbridge_client
.query_latest_state_machine_height(dest_client.state_machine_id())
.await?;

let state_commitment = hyperbridge_client
.query_state_machine_commitment(StateMachineHeight {
id: dest_client.state_machine_id(),
height,
})
.await?;
let height = hyperbridge_client
.query_latest_state_machine_height(dest_client.state_machine_id())
.await?;

if state_commitment.timestamp > post.timeout().as_secs() {
// early return if the destination has already finalized the height
return Ok(Some((
Ok(TimeoutStatus::DestinationFinalized {
meta: Default::default(),
}),
TimeoutStreamState::DestinationFinalized(height),
)));
}
let state_commitment = hyperbridge_client
.query_state_machine_commitment(StateMachineHeight {
id: dest_client.state_machine_id(),
height,
})
.await?;

let mut stream = hyperbridge_client
.state_machine_update_notification(dest_client.state_machine_id())
.await?;
let mut valid_proof_height = None;
while let Some(event) = stream.next().await {
match event {
Ok(ev) => {
let state_machine_height = StateMachineHeight {
id: ev.event.state_machine_id,
height: ev.event.latest_height,
};
let commitment = hyperbridge_client
.query_state_machine_commitment(state_machine_height)
.await?;
if commitment.timestamp > post.timeout().as_secs() {
valid_proof_height = Some(ev);
break;
}
},
Err(e) =>
return Ok(Some((
Err(anyhow!(
"Encountered error in time out stream {e:?}"
)),
state,
))),
}
if state_commitment.timestamp > post.timeout().as_secs() {
// early return if the destination has already finalized the height
return Ok(Some((
Ok(TimeoutStatus::DestinationFinalized {
meta: Default::default(),
}),
TimeoutStreamState::DestinationFinalized(height),
)));
}

let mut stream = hyperbridge_client
.state_machine_update_notification(dest_client.state_machine_id())
.await?;
let mut valid_proof_height = None;
while let Some(event) = stream.next().await {
match event {
Ok(ev) => {
let state_machine_height = StateMachineHeight {
id: ev.event.state_machine_id,
height: ev.event.latest_height,
};
let commitment = hyperbridge_client
.query_state_machine_commitment(state_machine_height)
.await?;
if commitment.timestamp > post.timeout().as_secs() {
valid_proof_height = Some(ev);
break;
}
},
Err(e) =>
return Ok(Some((
Err(anyhow!("Encountered error in time out stream {e:?}")),
state,
))),
}
Ok(valid_proof_height.map(|ev| {
(
Ok(TimeoutStatus::DestinationFinalized { meta: ev.meta }),
TimeoutStreamState::DestinationFinalized(
ev.event.latest_height,
),
)
}))
} else {
let height = hyperbridge_client.query_latest_block_height().await?;
Ok(Some((
Ok(TimeoutStatus::HyperbridgeVerified { meta: Default::default() }),
TimeoutStreamState::HyperbridgeVerified(height),
)))
}
Ok(valid_proof_height.map(|ev| {
(
Ok(TimeoutStatus::DestinationFinalized { meta: ev.meta }),
TimeoutStreamState::DestinationFinalized(ev.event.latest_height),
)
}))
},
TimeoutStreamState::DestinationFinalized(proof_height) => {
let relayer = hyperbridge_client.query_request_receipt(hash).await?;
if relayer == H160::zero() {
// request was never delivered
let latest_height =
hyperbridge_client.client.rpc().header(None).await?.ok_or_else(
|| anyhow!("Failed to query latest hyperbridge height!"),
)?;
return Ok(Some((
Ok(TimeoutStatus::HyperbridgeVerified { meta: Default::default() }),
TimeoutStreamState::HyperbridgeVerified(
latest_height.number.into(),
),
)))
}

let storage_key = dest_client.request_receipt_full_key(hash);
let proof =
dest_client.query_state_proof(proof_height, vec![storage_key]).await?;
Expand Down
4 changes: 2 additions & 2 deletions modules/ismp/pallets/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ where
.map_err(|_| runtime_error_into_rpc_error("Error accessing state backend"))?;
let child_root = state
.storage(child_info.prefixed_storage_key().as_slice())
.map_err(|_| runtime_error_into_rpc_error("Error reading child trie root"))?
.map_err(|err| runtime_error_into_rpc_error(format!("Storage Read Error: {err:?}")))?
.map(|r| {
let mut hash = <<Block::Header as Header>::Hashing as Hash>::Output::default();

Expand All @@ -323,7 +323,7 @@ where

hash
})
.ok_or_else(|| runtime_error_into_rpc_error("Error reading child trie root"))?;
.ok_or_else(|| runtime_error_into_rpc_error("Child trie root storage returned None"))?;

let db = storage_proof.into_memory_db::<<Block::Header as Header>::Hashing>();

Expand Down
1 change: 1 addition & 0 deletions tesseract/messaging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ async fn fee_accumulation<A: IsmpProvider + Clone + Clone + HyperbridgeClaim + '
let proofs = tx_payment
.create_proof_from_receipts(source_height.into(), dest_height, source_chain.clone(), dest.clone(), receipts.clone())
.await?;
observe_challenge_period(source_chain.clone(), hyperbridge.clone(), source_height.into()).await?;
observe_challenge_period(dest.clone(), hyperbridge.clone(), dest_height).await?;
let mut commitments = vec![];
for proof in proofs {
Expand Down

0 comments on commit 95d9be5

Please sign in to comment.