Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shutdown upon parentchain code update #1634

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions app-libs/parentchain-interface/src/event_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@
limitations under the License.

*/
use alloc::sync::Arc;
use core::sync::atomic::{AtomicBool, Ordering};
use itp_api_client_types::ParentchainApi;
use itp_types::parentchain::{AddedSgxEnclave, BalanceTransfer, ExtrinsicFailed, ParentchainId};
use log::warn;
use sp_runtime::DispatchError;
use substrate_api_client::SubscribeEvents;

pub fn subscribe_to_parentchain_events(api: &ParentchainApi, parentchain_id: ParentchainId) {
pub fn subscribe_to_parentchain_events(
api: &ParentchainApi,
parentchain_id: ParentchainId,
shutdown_flag: Arc<AtomicBool>,
) {
println!("[L1Event:{}] Subscribing to selected events", parentchain_id);
let mut subscription = api.subscribe_events().unwrap();
loop {
while !shutdown_flag.load(Ordering::Relaxed) {
let events = subscription.next_events_from_metadata().unwrap().unwrap();

for event in events.iter() {
Expand All @@ -49,6 +56,14 @@ pub fn subscribe_to_parentchain_events(api: &ParentchainApi, parentchain_id: Par
}
println!("[L1Event:{}] {:?}", parentchain_id, ev);
},
"CodeUpdated" => {
println!(
"[L1Event:{}] CodeUpdated. Initiating service shutdown to allow clean restart",
parentchain_id
);
shutdown_flag.store(true, Ordering::Relaxed);
},
"UpdateAuthorized" => warn!("[L1Event:{}] UpdateAuthorized", parentchain_id),
_ => continue,
},
"ParaInclusion" => continue,
Expand Down Expand Up @@ -90,4 +105,5 @@ pub fn subscribe_to_parentchain_events(api: &ParentchainApi, parentchain_id: Par
}
}
}
println!("[L1Event:{}] Subscription terminated", parentchain_id);
}
93 changes: 73 additions & 20 deletions service/src/main_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,20 @@ use itp_types::parentchain::{AccountId, Balance};
use sp_core::crypto::{AccountId32, Ss58Codec};
use sp_keyring::AccountKeyring;
use sp_runtime::MultiSigner;
use std::{fmt::Debug, path::PathBuf, str, str::Utf8Error, sync::Arc, thread, time::Duration};
use std::{
fmt::Debug,
path::PathBuf,
str,
str::Utf8Error,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use substrate_api_client::ac_node_api::{EventRecord, Phase::ApplyExtrinsic};
use tokio::runtime::Handle;
use tokio::{runtime::Handle, task::JoinHandle};

const VERSION: &str = env!("CARGO_PKG_VERSION");

Expand Down Expand Up @@ -563,6 +574,10 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(
(integritee_parentchain_handler, integritee_last_synced_header_at_last_run)
};

// some of the following threads need to be shut down gracefully.
let shutdown_flag = Arc::new(AtomicBool::new(false));
let mut sensitive_threads: Vec<thread::JoinHandle<()>> = Vec::new();

match WorkerModeProvider::worker_mode() {
WorkerMode::Teeracle => {
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -593,11 +608,13 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(
)
.unwrap();

start_parentchain_header_subscription_thread(
let handle = start_parentchain_header_subscription_thread(
shutdown_flag.clone(),
integritee_parentchain_handler,
last_synced_header,
*shard,
);
sensitive_threads.push(handle);

info!("skipping shard vault check because not yet supported for offchain worker");
},
Expand All @@ -617,11 +634,13 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(
integritee_last_synced_header_at_last_run
};

start_parentchain_header_subscription_thread(
let handle = start_parentchain_header_subscription_thread(
shutdown_flag.clone(),
integritee_parentchain_handler,
last_synced_header,
*shard,
);
sensitive_threads.push(handle);

spawn_worker_for_shard_polling(
shard,
Expand All @@ -632,27 +651,33 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(
}

let maybe_target_a_rpc_api = if let Some(url) = config.target_a_parentchain_rpc_endpoint() {
Some(init_target_parentchain(
let (api, mut handles) = init_target_parentchain(
&enclave,
&tee_accountid,
url,
shard,
ParentchainId::TargetA,
is_development_mode,
))
shutdown_flag.clone(),
);
sensitive_threads.append(&mut handles);
Some(api)
} else {
None
};

let maybe_target_b_rpc_api = if let Some(url) = config.target_b_parentchain_rpc_endpoint() {
Some(init_target_parentchain(
let (api, mut handles) = init_target_parentchain(
&enclave,
&tee_accountid,
url,
shard,
ParentchainId::TargetB,
is_development_mode,
))
shutdown_flag.clone(),
);
sensitive_threads.append(&mut handles);
Some(api)
} else {
None
};
Expand Down Expand Up @@ -688,14 +713,29 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(

if WorkerModeProvider::worker_mode() == WorkerMode::Sidechain {
println!("[Integritee:SCV] starting block production");
let last_synced_header =
sidechain_init_block_production(enclave.clone(), sidechain_storage).unwrap();
let mut handles = sidechain_init_block_production(
enclave.clone(),
sidechain_storage,
shutdown_flag.clone(),
)
.unwrap();
sensitive_threads.append(&mut handles);
}

ita_parentchain_interface::event_subscriber::subscribe_to_parentchain_events(
&integritee_rpc_api,
ParentchainId::Integritee,
shutdown_flag.clone(),
);
println!(
"[!] waiting for {} sensitive threads to shut down gracefully",
sensitive_threads.len()
);
// Join each thread to ensure they have completed
for handle in sensitive_threads {
handle.join().expect("Thread panicked");
}
println!("[!] All threads stopped gracefully.");
}

fn init_provided_shard_vault<E: EnclaveBase>(
Expand Down Expand Up @@ -754,7 +794,8 @@ fn init_target_parentchain<E>(
shard: &ShardIdentifier,
parentchain_id: ParentchainId,
is_development_mode: bool,
) -> ParentchainApi
shutdown_flag: Arc<AtomicBool>,
) -> (ParentchainApi, Vec<thread::JoinHandle<()>>)
where
E: EnclaveBase + Sidechain,
{
Expand Down Expand Up @@ -785,6 +826,8 @@ where
// we ignore failure
let _ = enclave.init_shard_creation_parentchain_header(shard, &parentchain_id, &head);

let mut handles = Vec::new();

if WorkerModeProvider::worker_mode() != WorkerMode::Teeracle {
println!(
"[{:?}] Finished initializing light client, syncing parentchain...",
Expand All @@ -796,11 +839,13 @@ where
.sync_parentchain_until_latest_finalized(last_synched_header, *shard, true)
.unwrap();

start_parentchain_header_subscription_thread(
let handle = start_parentchain_header_subscription_thread(
shutdown_flag.clone(),
parentchain_handler.clone(),
last_synched_header,
*shard,
)
);
handles.push(handle);
}

let parentchain_init_params = parentchain_handler.parentchain_init_params.clone();
Expand All @@ -812,10 +857,11 @@ where
ita_parentchain_interface::event_subscriber::subscribe_to_parentchain_events(
&node_api_clone,
parentchain_id,
shutdown_flag,
)
})
.unwrap();
node_api
(node_api, handles)
}

fn init_parentchain<E>(
Expand Down Expand Up @@ -1018,30 +1064,35 @@ fn send_integritee_extrinsic(
}

fn start_parentchain_header_subscription_thread<E: EnclaveBase + Sidechain>(
shutdown_flag: Arc<AtomicBool>,
parentchain_handler: Arc<ParentchainHandler<ParentchainApi, E>>,
last_synced_header: Header,
shard: ShardIdentifier,
) {
) -> thread::JoinHandle<()> {
let parentchain_id = *parentchain_handler.parentchain_id();
thread::Builder::new()
.name(format!("{:?}_parentchain_sync_loop", parentchain_id))
.spawn(move || {
if let Err(e) =
subscribe_to_parentchain_new_headers(parentchain_handler, last_synced_header, shard)
{
if let Err(e) = subscribe_to_parentchain_new_headers(
shutdown_flag,
parentchain_handler,
last_synced_header,
shard,
) {
error!(
"[{:?}] parentchain block syncing terminated with a failure: {:?}",
parentchain_id, e
);
}
println!("[!] [{:?}] parentchain block syncing has terminated", parentchain_id);
})
.unwrap();
.unwrap()
}

/// Subscribe to the node API finalized heads stream and trigger a parent chain sync
/// upon receiving a new header.
fn subscribe_to_parentchain_new_headers<E: EnclaveBase + Sidechain>(
shutdown_flag: Arc<AtomicBool>,
parentchain_handler: Arc<ParentchainHandler<ParentchainApi, E>>,
mut last_synced_header: Header,
shard: ShardIdentifier,
Expand All @@ -1053,7 +1104,7 @@ fn subscribe_to_parentchain_new_headers<E: EnclaveBase + Sidechain>(
.subscribe_finalized_heads()
.map_err(Error::ApiClient)?;
let parentchain_id = parentchain_handler.parentchain_id();
loop {
while !shutdown_flag.load(Ordering::Relaxed) {
let new_header = subscription
.next()
.ok_or(Error::ApiSubscriptionDisconnected)?
Expand All @@ -1070,6 +1121,8 @@ fn subscribe_to_parentchain_new_headers<E: EnclaveBase + Sidechain>(
false,
)?;
}
warn!("[{:?}] parent chain block syncing has terminated", parentchain_id);
Ok(())
}

/// Get the public signing key of the TEE.
Expand Down
18 changes: 13 additions & 5 deletions service/src/sidechain_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use its_primitives::types::block::SignedBlock as SignedSidechainBlock;
use its_storage::{interface::FetchBlocks, start_sidechain_pruning_loop, BlockPruner};
use log::*;
use sp_runtime::{traits::IdentifyAccount, MultiSigner};
use std::{sync::Arc, thread};
use std::{
sync::{atomic::AtomicBool, Arc},
thread,
};
use teerex_primitives::AnySigner;
use tokio::runtime::Handle;

Expand Down Expand Up @@ -113,7 +116,8 @@ pub(crate) fn sidechain_start_untrusted_rpc_server<SidechainStorage>(
pub(crate) fn sidechain_init_block_production<Enclave, SidechainStorage>(
enclave: Arc<Enclave>,
sidechain_storage: Arc<SidechainStorage>,
) -> ServiceResult<()>
shutdown_flag: Arc<AtomicBool>,
) -> ServiceResult<Vec<thread::JoinHandle<()>>>
where
Enclave: EnclaveBase + Sidechain,
SidechainStorage: BlockPruner + FetchBlocks<SignedSidechainBlock> + Sync + Send + 'static,
Expand All @@ -126,12 +130,14 @@ where
// Start interval sidechain block production (execution of trusted calls, sidechain block production).
let sidechain_enclave_api = enclave;
println!("[+] Spawning thread for sidechain block production");
thread::Builder::new()
let local_shutdown_flag = shutdown_flag.clone();
let block_production_handle = thread::Builder::new()
.name("interval_block_production_timer".to_owned())
.spawn(move || {
let future = start_slot_worker(
|| execute_trusted_calls(sidechain_enclave_api.as_ref()),
SLOT_DURATION,
local_shutdown_flag,
);
block_on(future);
println!("[!] Sidechain block production loop has terminated");
Expand All @@ -140,18 +146,20 @@ where

// ------------------------------------------------------------------------
// start sidechain pruning loop
thread::Builder::new()
let pruning_handle = thread::Builder::new()
.name("sidechain_pruning_loop".to_owned())
.spawn(move || {
start_sidechain_pruning_loop(
&sidechain_storage,
SIDECHAIN_PURGE_INTERVAL,
SIDECHAIN_PURGE_LIMIT,
shutdown_flag,
);
println!("[!] Sidechain block pruning loop has terminated");
})
.map_err(|e| Error::Custom(Box::new(e)))?;

Ok(())
Ok([block_production_handle, pruning_handle].into())
}

/// Execute trusted operations in the enclave.
Expand Down
6 changes: 4 additions & 2 deletions sidechain/consensus/slots/src/slot_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@
//! provides generic functionality for slots.

use crate::time_until_next_slot;
use alloc::sync::Arc;
use core::sync::atomic::{AtomicBool, Ordering};
use futures_timer::Delay;
use std::time::Duration;

/// Executes given `task` repeatedly when the next slot becomes available.
pub async fn start_slot_worker<F>(task: F, slot_duration: Duration)
pub async fn start_slot_worker<F>(task: F, slot_duration: Duration, shutdown_flag: Arc<AtomicBool>)
where
F: Fn(),
{
let mut slot_stream = SlotStream::new(slot_duration);

loop {
while !shutdown_flag.load(Ordering::Relaxed) {
slot_stream.next_slot().await;
task();
}
Expand Down
Loading
Loading