Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing strategy refactoring (part 2) #5666

Merged
23 changes: 18 additions & 5 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ use sc_consensus::{
use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend};
use sc_network_sync::SyncingService;
use sc_network_transactions::TransactionsHandlerController;
use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig};
use sc_service::{
build_polkadot_syncing_strategy, Configuration, NetworkStarter, SpawnTaskHandle, TaskManager,
WarpSyncConfig,
};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -425,7 +428,7 @@ pub struct BuildNetworkParams<
pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
BuildNetworkParams {
parachain_config,
net_config,
mut net_config,
client,
transaction_pool,
para_id,
Expand Down Expand Up @@ -462,7 +465,7 @@ where
IQ: ImportQueue<Block> + 'static,
Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
{
let warp_sync_params = match parachain_config.network.sync_mode {
let warp_sync_config = match parachain_config.network.sync_mode {
SyncMode::Warp => {
log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");

Expand Down Expand Up @@ -493,9 +496,19 @@ where
},
};
let metrics = Network::register_notification_metrics(
parachain_config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
);

let syncing_strategy = build_polkadot_syncing_strategy(
parachain_config.protocol_id(),
parachain_config.chain_spec.fork_id(),
&mut net_config,
warp_sync_config,
client.clone(),
&spawn_handle,
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

sc_service::build_network(sc_service::BuildNetworkParams {
config: parachain_config,
net_config,
Expand All @@ -504,7 +517,7 @@ where
spawn_handle,
import_queue,
block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
warp_sync_config: warp_sync_params,
syncing_strategy,
block_relay: None,
metrics,
})
Expand Down
14 changes: 12 additions & 2 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use prometheus_endpoint::Registry;
#[cfg(feature = "full-node")]
use sc_service::KeystoreContainer;
use sc_service::{RpcHandlers, SpawnTaskHandle};
use sc_service::{build_polkadot_syncing_strategy, RpcHandlers, SpawnTaskHandle};
use sc_telemetry::TelemetryWorker;
#[cfg(feature = "full-node")]
use sc_telemetry::{Telemetry, TelemetryWorkerHandle};
Expand Down Expand Up @@ -1028,6 +1028,16 @@ pub fn new_full<
})
};

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -1037,7 +1047,7 @@ pub fn new_full<
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
syncing_strategy,
block_relay: None,
metrics,
})?;
Expand Down
19 changes: 19 additions & 0 deletions prdoc/pr_5666.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
title: Make syncing strategy an argument of the syncing engine

doc:
- audience: Node Dev
description: |
Syncing strategy is no longer implicitly created when building network, but needs to be instantiated explicitly.
Previously default implementation can be created with new function `build_polkadot_syncing_strategy` or custom
syncing strategy could be implemented and used instead if desired, providing greater flexibility for chain
developers.

crates:
- name: cumulus-client-service
bump: patch
- name: polkadot-service
bump: patch
- name: sc-service
bump: major
- name: sc-network-sync
bump: major
13 changes: 12 additions & 1 deletion substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use frame_system_rpc_runtime_api::AccountNonceApi;
use futures::prelude::*;
use kitchensink_runtime::RuntimeApi;
use node_primitives::Block;
use polkadot_sdk::sc_service::build_polkadot_syncing_strategy;
use sc_client_api::{Backend, BlockBackend};
use sc_consensus_babe::{self, SlotProportion};
use sc_network::{
Expand Down Expand Up @@ -506,6 +507,16 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
Vec::default(),
));

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -515,7 +526,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
syncing_strategy,
block_relay: None,
metrics,
})?;
Expand Down
77 changes: 19 additions & 58 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ use crate::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
block_relay_protocol::{BlockDownloader, BlockResponseError},
block_request_handler::MAX_BLOCKS_IN_RESPONSE,
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{
self,
syncing_service::{SyncingService, ToServiceCommand},
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncConfig},
PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy,
warp::{EncodedProof, WarpProofRequest},
StrategyKey, SyncingAction, SyncingStrategy,
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand Down Expand Up @@ -189,7 +188,7 @@ pub struct Peer<B: BlockT> {

pub struct SyncingEngine<B: BlockT, Client> {
/// Syncing strategy.
strategy: PolkadotSyncingStrategy<B, Client>,
strategy: Box<dyn SyncingStrategy<B>>,

/// Blockchain client.
client: Arc<Client>,
Expand Down Expand Up @@ -271,12 +270,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Block downloader
block_downloader: Arc<dyn BlockDownloader<B>>,

/// Protocol name used to send out state requests
state_request_protocol_name: ProtocolName,

/// Protocol name used to send out warp sync requests
warp_sync_protocol_name: Option<ProtocolName>,

/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,
}
Expand All @@ -301,35 +294,15 @@ where
protocol_id: ProtocolId,
fork_id: &Option<String>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
warp_sync_config: Option<WarpSyncConfig<B>>,
syncing_strategy: Box<dyn SyncingStrategy<B>>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_downloader: Arc<dyn BlockDownloader<B>>,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
where
N: NetworkBackend<B, <B as BlockT>::Hash>,
{
let mode = net_config.network_config.sync_mode;
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
let max_blocks_per_request =
if net_config.network_config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
log::info!(
target: LOG_TARGET,
"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
);
MAX_BLOCKS_IN_RESPONSE as u32
} else {
net_config.network_config.max_blocks_per_request
};
let syncing_config = SyncingConfig {
mode,
max_parallel_downloads,
max_blocks_per_request,
metrics_registry: metrics_registry.cloned(),
};
let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
net_config.network_config.default_peers_set.out_peers)
.max(1);
Expand Down Expand Up @@ -388,10 +361,6 @@ where
Arc::clone(&peer_store_handle),
);

// Initialize syncing strategy.
let strategy =
PolkadotSyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
let num_connected = Arc::new(AtomicUsize::new(0));
Expand All @@ -413,7 +382,7 @@ where
Self {
roles,
client,
strategy,
strategy: syncing_strategy,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
Expand Down Expand Up @@ -450,8 +419,6 @@ where
},
pending_responses: PendingResponses::new(),
block_downloader,
state_request_protocol_name,
warp_sync_protocol_name,
import_queue,
},
SyncingService::new(tx, num_connected, is_major_syncing),
Expand Down Expand Up @@ -652,16 +619,16 @@ where
"Processed {action:?}, response removed: {removed}.",
);
},
SyncingAction::SendStateRequest { peer_id, key, request } => {
self.send_state_request(peer_id, key, request);
SyncingAction::SendStateRequest { peer_id, key, protocol_name, request } => {
self.send_state_request(peer_id, key, protocol_name, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
);
},
SyncingAction::SendWarpProofRequest { peer_id, key, request } => {
self.send_warp_proof_request(peer_id, key, request.clone());
SyncingAction::SendWarpProofRequest { peer_id, key, protocol_name, request } => {
self.send_warp_proof_request(peer_id, key, protocol_name, request.clone());

trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1054,6 +1021,7 @@ where
&mut self,
peer_id: PeerId,
key: StrategyKey,
protocol_name: ProtocolName,
request: OpaqueStateRequest,
) {
if !self.peers.contains_key(&peer_id) {
Expand All @@ -1070,7 +1038,7 @@ where
Ok(data) => {
self.network_service.start_request(
peer_id,
self.state_request_protocol_name.clone(),
protocol_name,
data,
tx,
IfDisconnected::ImmediateError,
Expand All @@ -1089,6 +1057,7 @@ where
&mut self,
peer_id: PeerId,
key: StrategyKey,
protocol_name: ProtocolName,
request: WarpProofRequest<B>,
) {
if !self.peers.contains_key(&peer_id) {
Expand All @@ -1101,21 +1070,13 @@ where

self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());

match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
peer_id,
name.clone(),
request.encode(),
tx,
IfDisconnected::ImmediateError,
),
None => {
log::warn!(
target: LOG_TARGET,
"Trying to send warp sync request when no protocol is configured {request:?}",
);
},
}
self.network_service.start_request(
peer_id,
protocol_name,
request.encode(),
tx,
IfDisconnected::ImmediateError,
);
}

fn encode_state_request(request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
Expand Down
Loading
Loading