Skip to content

Commit

Permalink
Merge pull request #1244 from input-output-hk/ensemble/1127-fix-recor…
Browse files Browse the repository at this point in the history
…d-statistics

Fix record download statistics
  • Loading branch information
jpraynaud authored Sep 21, 2023
2 parents 54faa24 + 560781b commit 7d5a683
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 21 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.3.94"
version = "0.3.95"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
14 changes: 7 additions & 7 deletions mithril-aggregator/src/http_server/routes/statistics_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@ fn post_statistics(
mod handlers {
use std::{convert::Infallible, sync::Arc};

use mithril_common::messages::SnapshotMessage;
use mithril_common::messages::SnapshotDownloadMessage;
use reqwest::StatusCode;

use crate::event_store::{EventMessage, TransmitterService};
use crate::http_server::routes::reply;

pub async fn post_snapshot_statistics(
snapshot_message: SnapshotMessage,
snapshot_download_message: SnapshotDownloadMessage,
event_transmitter: Arc<TransmitterService<EventMessage>>,
) -> Result<impl warp::Reply, Infallible> {
let headers: Vec<(&str, &str)> = Vec::new();

match event_transmitter.send_event_message(
"HTTP::statistics",
"snapshot_downloaded",
&snapshot_message,
&snapshot_download_message,
headers,
) {
Err(e) => Ok(reply::internal_server_error(e)),
Expand All @@ -54,7 +54,7 @@ mod handlers {
mod tests {
use super::*;

use mithril_common::messages::SnapshotMessage;
use mithril_common::messages::SnapshotDownloadMessage;
use mithril_common::test_utils::apispec::APISpec;

use warp::{http::Method, test::request};
Expand Down Expand Up @@ -82,14 +82,14 @@ mod tests {
let mut builder = DependenciesBuilder::new(config);
let mut rx = builder.get_event_transmitter_receiver().await.unwrap();
let dependency_manager = builder.build_dependency_container().await.unwrap();
let snapshot_message = SnapshotMessage::dummy();
let snapshot_download_message = SnapshotDownloadMessage::dummy();

let method = Method::POST.as_str();
let path = "/statistics/snapshot";

let response = request()
.method(method)
.json(&snapshot_message)
.json(&snapshot_download_message)
.path(&format!("/{SERVER_BASE_PATH}{path}"))
.reply(&setup_router(Arc::new(dependency_manager)))
.await;
Expand All @@ -99,7 +99,7 @@ mod tests {
method,
path,
"application/json",
&snapshot_message,
&snapshot_download_message,
&response,
);

Expand Down
2 changes: 1 addition & 1 deletion mithril-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-client"
version = "0.4.1"
version = "0.4.2"
description = "A Mithril Client"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions mithril-client/src/aggregator_client/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ impl AggregatorHTTPClient {
/// Issue a POST HTTP request.
#[async_recursion]
async fn post(&self, url: &str, json: &str) -> Result<Response, AggregatorHTTPClientError> {
let request_builder = Client::new().post(url.to_owned()).json(json);
debug!("POST url='{url}' json='{json}'.");
let request_builder = Client::new().post(url.to_owned()).body(json.to_owned());
let current_api_version = self
.compute_current_api_version()
.await
Expand All @@ -177,7 +178,7 @@ impl AggregatorHTTPClient {
})?;

match response.status() {
StatusCode::OK => Ok(response),
StatusCode::OK | StatusCode::CREATED => Ok(response),
StatusCode::PRECONDITION_FAILED => {
if self.discard_current_api_version().await.is_some()
&& !self.api_versions.read().await.is_empty()
Expand Down
13 changes: 9 additions & 4 deletions mithril-client/src/aggregator_client/snapshot_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use thiserror::Error;

use mithril_common::{
entities::Snapshot,
messages::{SnapshotListItemMessage, SnapshotListMessage, SnapshotMessage},
messages::{SnapshotListItemMessage, SnapshotListMessage, SnapshotMessage, ToMessageAdapter},
StdResult,
};

use crate::aggregator_client::AggregatorClient;
use crate::utils::DownloadProgressReporter;
use crate::{
aggregator_client::AggregatorClient, message_adapters::ToSnapshotDownloadMessageAdapter,
};

/// Error for the Snapshot client
#[derive(Error, Debug)]
Expand Down Expand Up @@ -77,7 +79,9 @@ impl SnapshotClient {
{
Ok(()) => {
// the snapshot download does not fail if the statistic call fails.
let _ = self.add_statistics(snapshot).await;
if let Err(e) = self.add_statistics(snapshot).await {
warn!("Could not POST snapshot download statistics: {e:?}");
}

Ok(())
}
Expand All @@ -101,7 +105,8 @@ impl SnapshotClient {
/// Increments Aggregator's download statistics
pub async fn add_statistics(&self, snapshot: &Snapshot) -> StdResult<()> {
let url = "statistics/snapshot";
let json = serde_json::to_string(snapshot)?;
let snapshot_download_message = ToSnapshotDownloadMessageAdapter::adapt(snapshot);
let json = serde_json::to_string(&snapshot_download_message)?;
let _response = self.http_client.post_content(url, &json).await?;

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions mithril-client/src/message_adapters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod from_certificate_message_adapter;
mod from_mithril_stake_distribution_message;
mod from_snapshot_message;
mod to_snapshot_download_message;

pub use from_certificate_message_adapter::FromCertificateMessageAdapter;
pub use from_mithril_stake_distribution_message::FromMithrilStakeDistributionMessageAdapter;
pub use from_snapshot_message::FromSnapshotMessageAdapter;
pub use to_snapshot_download_message::ToSnapshotDownloadMessageAdapter;
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use mithril_common::entities::Snapshot;
use mithril_common::messages::{SnapshotDownloadMessage, ToMessageAdapter};

/// Adapter to convert [Snapshot] to [SnapshotDownloadMessage] instances
pub struct ToSnapshotDownloadMessageAdapter;

impl ToMessageAdapter<&Snapshot, SnapshotDownloadMessage> for ToSnapshotDownloadMessageAdapter {
/// Method to trigger the conversion
fn adapt(snapshot: &Snapshot) -> SnapshotDownloadMessage {
SnapshotDownloadMessage {
digest: snapshot.digest.clone(),
beacon: snapshot.beacon.clone(),
size: snapshot.size,
locations: snapshot.locations.clone(),
compression_algorithm: snapshot.compression_algorithm,
cardano_node_version: snapshot.cardano_node_version.clone(),
}
}
}

#[cfg(test)]
mod tests {
use mithril_common::test_utils::fake_data;

use super::*;

#[test]
fn adapt_ok() {
let mut snapshot = fake_data::snapshots(1)[0].to_owned();
snapshot.digest = "digest123".to_string();
let snapshot_download_message = ToSnapshotDownloadMessageAdapter::adapt(&snapshot);

assert_eq!("digest123".to_string(), snapshot_download_message.digest);
}
}
2 changes: 1 addition & 1 deletion mithril-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-common"
version = "0.2.114"
version = "0.2.115"
authors = { workspace = true }
edition = { workspace = true }
documentation = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions mithril-common/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod mithril_stake_distribution_list;
mod register_signature;
mod register_signer;
mod snapshot;
mod snapshot_download;
mod snapshot_list;

pub use certificate::CertificateMessage;
Expand All @@ -28,4 +29,5 @@ pub use mithril_stake_distribution_list::{
pub use register_signature::RegisterSignatureMessage;
pub use register_signer::RegisterSignerMessage;
pub use snapshot::SnapshotMessage;
pub use snapshot_download::SnapshotDownloadMessage;
pub use snapshot_list::{SnapshotListItemMessage, SnapshotListMessage};
88 changes: 88 additions & 0 deletions mithril-common/src/messages/snapshot_download.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use serde::{Deserialize, Serialize};

use crate::entities::{Beacon, CompressionAlgorithm, Epoch};

/// Message structure of a snapshot
#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct SnapshotDownloadMessage {
/// Digest that is signed by the signer participants
pub digest: String,

/// Mithril beacon on the Cardano chain
pub beacon: Beacon,

/// Size of the snapshot file in Bytes
pub size: u64,

/// Locations where the binary content of the snapshot can be retrieved
pub locations: Vec<String>,

/// Compression algorithm of the snapshot archive
pub compression_algorithm: CompressionAlgorithm,

/// Cardano node version
pub cardano_node_version: String,
}

impl SnapshotDownloadMessage {
/// Return a dummy test entity (test-only).
pub fn dummy() -> Self {
Self {
digest: "0b9f5ad7f33cc523775c82249294eb8a1541d54f08eb3107cafc5638403ec7c6".to_string(),
beacon: Beacon {
network: "preview".to_string(),
epoch: Epoch(86),
immutable_file_number: 1728,
},
size: 807803196,
locations: vec!["https://host/certificate.tar.gz".to_string()],
compression_algorithm: CompressionAlgorithm::Gzip,
cardano_node_version: "0.0.1".to_string(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

fn golden_message_v1() -> SnapshotDownloadMessage {
SnapshotDownloadMessage {
digest: "0b9f5ad7f33cc523775c82249294eb8a1541d54f08eb3107cafc5638403ec7c6".to_string(),
beacon: Beacon {
network: "preview".to_string(),
epoch: Epoch(86),
immutable_file_number: 1728,
},
size: 807803196,
locations: vec!["https://host/certificate.tar.gz".to_string()],
compression_algorithm: CompressionAlgorithm::Gzip,
cardano_node_version: "0.0.1".to_string(),
}
}

// Test the retro compatibility with possible future upgrades.
#[test]
fn test_v1() {
let json = r#"{
"digest": "0b9f5ad7f33cc523775c82249294eb8a1541d54f08eb3107cafc5638403ec7c6",
"beacon": {
"network": "preview",
"epoch": 86,
"immutable_file_number": 1728
},
"size": 807803196,
"locations": [
"https://host/certificate.tar.gz"
],
"compression_algorithm": "gzip",
"cardano_node_version": "0.0.1"
}
"#;
let message: SnapshotDownloadMessage = serde_json::from_str(json).expect(
"This JSON is expected to be succesfully parsed into a SnapshotDownloadMessage instance.",
);

assert_eq!(golden_message_v1(), message);
}
}
58 changes: 56 additions & 2 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,12 @@ paths:
summary: Records snapshot download event
description: Records snapshot download event
requestBody:
description: Downloaded snapshot
description: Downloaded snapshot message
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/SnapshotMessage"
$ref: "#/components/schemas/SnapshotDownloadMessage"
responses:
"201":
description: Event successfully recorded
Expand Down Expand Up @@ -1148,6 +1148,60 @@ components:
"cardano_node_version": "1.0.0"
}

SnapshotDownloadMessage:
description: SnapshotDownloadMessage represents a downloaded snapshot event
type: object
additionalProperties: false
required:
- digest
- beacon
- size
- locations
- compression_algorithm
- cardano_node_version
properties:
digest:
description: Digest that is signed by the signer participants
type: string
format: bytes
beacon:
$ref: "#/components/schemas/Beacon"
size:
description: Size of the snapshot file in Bytes
type: integer
format: int64
locations:
description: Locations where the binary content of the snapshot can be retrieved
type: array
items:
type: string
compression_algorithm:
description: Compression algorithm for the snapshot archive
type: string
cardano_node_version:
description: Version of the Cardano node which is used to create snapshot archives.
type: string
example:
{
"digest": "6367ee65d0d1272e6e70736a1ea2cae34015874517f6328364f6b73930966732",
"beacon":
{
"network": "mainnet",
"epoch": 329,
"immutable_file_number": 7060000
},
"size": 26058531636,
"locations":
[
"https://mithril-cdn-us.iohk.io/snapshot/6367ee65d0d1272e6e70736a1ea2cae34015874517f6328364f6b73930966732",
"https://mithril-cdn-eu.iohk.io/snapshot/6367ee65d0d1272e6e70736a1ea2cae34015874517f6328364f6b73930966732",
"magnet:?xt=urn:sha1:YNCKHTQCWBTRNJIV4WNAE52SJUQCZO5C",
"ipfs:QmPXME1oRtoT627YKaDPDQ3PwA8tdP9rWuAAweLzqSwAWT"
],
"compression_algorithm": "zstandard",
"cardano_node_version": "1.0.0"
}

MithrilStakeDistributionListMessage:
description: MithrilStakeDistributionListMessage represents a list of Mithril stake distribution
type: array
Expand Down

0 comments on commit 7d5a683

Please sign in to comment.