Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ApiHeadChange type #4276

Closed
wants to merge 14 commits into from
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@
- [#4267](https://github.com/ChainSafe/forest/pull/4267) Fixed potential panics
in `forest-tool api compare`.

- [#4276](https://github.com/ChainSafe/forest/pull/4276) Fix schema bug in the
`Filecoin.ChainNotify` RPC method.

## Forest 0.17.2 "Dovakhin"

This is a **mandatory** release for all mainnet node operators. It changes the
Expand Down
64 changes: 63 additions & 1 deletion src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use std::fmt::{self, Debug};
use std::time::Duration;

use anyhow::bail;
use futures::StreamExt;
use http02::{header, HeaderMap, HeaderValue};
use jsonrpsee::core::client::ClientT as _;
use jsonrpsee::core::client::{ClientT as _, SubscriptionClientT};
use jsonrpsee::core::params::{ArrayParams, ObjectParams};
use jsonrpsee::core::ClientError;
use once_cell::sync::Lazy;
Expand All @@ -25,6 +26,8 @@ use url::Url;

use super::{ApiVersion, Request, MAX_REQUEST_BODY_SIZE, MAX_RESPONSE_BODY_SIZE};

const DEFAULT_NOTIFICATION_SAMPLES: usize = 2;

/// A JSON-RPC client that can dispatch either a [`crate::rpc::Request`] to a single URL.
pub struct Client {
/// SHOULD end in a slash, due to our use of [`Url::join`].
Expand Down Expand Up @@ -129,6 +132,65 @@ impl Client {
};
work.instrument(span.or_current()).await
}
pub async fn subscribe<T: crate::lotus_json::HasLotusJson + std::fmt::Debug>(
&self,
req: Request<T>,
) -> Result<Vec<T>, ClientError> {
let Request {
method_name,
params,
api_version,
..
} = req;

let client = self.get_or_init_client(api_version).await?;

let subscription = match params {
serde_json::Value::Null => {
client.subscribe::<T::LotusJson, _>(method_name, ArrayParams::new(), "xrpc.cancel")
}
serde_json::Value::Array(it) => {
let mut params = ArrayParams::new();
for param in it {
params.insert(param)?
}
trace_params(params.clone());
client.subscribe(method_name, params, "xrpc.cancel")
}
serde_json::Value::Object(it) => {
let mut params = ObjectParams::new();
for (name, param) in it {
params.insert(&name, param)?
}
trace_params(params.clone());
client.subscribe(method_name, params, "xrpc.cancel")
}
prim @ (serde_json::Value::Bool(_)
| serde_json::Value::Number(_)
| serde_json::Value::String(_)) => {
return Err(ClientError::Custom(format!(
"invalid parameter type: `{}`",
prim
)))
}
}
.await?;

subscription
.take(DEFAULT_NOTIFICATION_SAMPLES)
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| {
let result = match r {
Ok(it) => Ok(T::from_lotus_json(it)),
Err(e) => Err(e),
};
debug!(?result);
result
})
.collect()
}
async fn get_or_init_client(&self, version: ApiVersion) -> Result<&UrlClient, ClientError> {
match version {
ApiVersion::V0 => &self.v0,
Expand Down
22 changes: 15 additions & 7 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,21 +642,27 @@ pub(crate) fn chain_notify<DB: Blockstore>(

// As soon as the channel is created, send the current tipset
let current = data.chain_store.heaviest_tipset();
let (change, headers) = ("current".into(), current.block_headers().clone().into());
let (change, tipset) = ("current".into(), current);
sender
.send(vec![ApiHeadChange { change, headers }])
.send(vec![ApiHeadChange {
change,
tipset: tipset.as_ref().clone(),
}])
.expect("receiver is not dropped");

let mut subscriber = data.chain_store.publisher().subscribe();

tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let (change, headers) = match v {
HeadChange::Apply(ts) => ("apply".into(), ts.block_headers().clone().into()),
let (change, tipset) = match v {
HeadChange::Apply(ts) => ("apply".into(), ts),
};

if sender
.send(vec![ApiHeadChange { change, headers }])
.send(vec![ApiHeadChange {
change,
tipset: tipset.as_ref().clone(),
}])
.is_err()
{
break;
Expand Down Expand Up @@ -759,14 +765,16 @@ pub struct ChainExportParams {
}
lotus_json_with_self!(ChainExportParams);

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(rename_all = "PascalCase")]
pub struct ApiHeadChange {
#[serde(rename = "Type")]
pub change: String,
#[serde(rename = "Val", with = "crate::lotus_json")]
pub headers: Vec<CachingBlockHeader>,
#[schemars(with = "LotusJson<Tipset>")]
pub tipset: Tipset,
}
lotus_json_with_self!(ApiHeadChange);

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(rename_all = "snake_case")]
Expand Down
6 changes: 5 additions & 1 deletion src/rpc/request.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::ApiVersion;
use super::{chain::CHAIN_NOTIFY, ApiVersion};
use jsonrpsee::core::traits::ToRpcParams;
use std::{marker::PhantomData, time::Duration};

Expand Down Expand Up @@ -36,6 +36,10 @@ impl<T> Request<T> {
timeout: self.timeout,
}
}

pub fn is_subscription_method(&self) -> bool {
matches!(self.method_name, CHAIN_NOTIFY)
}
}

impl<T> ToRpcParams for Request<T> {
Expand Down
Loading
Loading