Skip to content

Commit

Permalink
shutdown upon parentchain code update (#1634)
Browse files Browse the repository at this point in the history
* trigger graceful shutdown of sensitive threads upon CodeUpdated event

* fix shutdown of all threads

* clippy

* review fixes
  • Loading branch information
brenzi authored Nov 6, 2024
1 parent 5229af7 commit 98accb3
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 34 deletions.
20 changes: 18 additions & 2 deletions app-libs/parentchain-interface/src/event_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@
limitations under the License.
*/
use alloc::sync::Arc;
use core::sync::atomic::{AtomicBool, Ordering};
use itp_api_client_types::ParentchainApi;
use itp_types::parentchain::{AddedSgxEnclave, BalanceTransfer, ExtrinsicFailed, ParentchainId};
use log::warn;
use sp_runtime::DispatchError;
use substrate_api_client::SubscribeEvents;

pub fn subscribe_to_parentchain_events(api: &ParentchainApi, parentchain_id: ParentchainId) {
pub fn subscribe_to_parentchain_events(
api: &ParentchainApi,
parentchain_id: ParentchainId,
shutdown_flag: Arc<AtomicBool>,
) {
println!("[L1Event:{}] Subscribing to selected events", parentchain_id);
let mut subscription = api.subscribe_events().unwrap();
loop {
while !shutdown_flag.load(Ordering::Relaxed) {
let events = subscription.next_events_from_metadata().unwrap().unwrap();

for event in events.iter() {
Expand All @@ -49,6 +56,14 @@ pub fn subscribe_to_parentchain_events(api: &ParentchainApi, parentchain_id: Par
}
println!("[L1Event:{}] {:?}", parentchain_id, ev);
},
"CodeUpdated" => {
println!(
"[L1Event:{}] CodeUpdated. Initiating service shutdown to allow clean restart",
parentchain_id
);
shutdown_flag.store(true, Ordering::Relaxed);
},
"UpdateAuthorized" => warn!("[L1Event:{}] UpdateAuthorized", parentchain_id),
_ => continue,
},
"ParaInclusion" => continue,
Expand Down Expand Up @@ -90,4 +105,5 @@ pub fn subscribe_to_parentchain_events(api: &ParentchainApi, parentchain_id: Par
}
}
}
println!("[L1Event:{}] Subscription terminated", parentchain_id);
}
93 changes: 73 additions & 20 deletions service/src/main_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,20 @@ use itp_types::parentchain::{AccountId, Balance};
use sp_core::crypto::{AccountId32, Ss58Codec};
use sp_keyring::AccountKeyring;
use sp_runtime::MultiSigner;
use std::{fmt::Debug, path::PathBuf, str, str::Utf8Error, sync::Arc, thread, time::Duration};
use std::{
fmt::Debug,
path::PathBuf,
str,
str::Utf8Error,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use substrate_api_client::ac_node_api::{EventRecord, Phase::ApplyExtrinsic};
use tokio::runtime::Handle;
use tokio::{runtime::Handle, task::JoinHandle};

const VERSION: &str = env!("CARGO_PKG_VERSION");

Expand Down Expand Up @@ -563,6 +574,10 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(
(integritee_parentchain_handler, integritee_last_synced_header_at_last_run)
};

// some of the following threads need to be shut down gracefully.
let shutdown_flag = Arc::new(AtomicBool::new(false));
let mut sensitive_threads: Vec<thread::JoinHandle<()>> = Vec::new();

match WorkerModeProvider::worker_mode() {
WorkerMode::Teeracle => {
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -593,11 +608,13 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(
)
.unwrap();

start_parentchain_header_subscription_thread(
let handle = start_parentchain_header_subscription_thread(
shutdown_flag.clone(),
integritee_parentchain_handler,
last_synced_header,
*shard,
);
sensitive_threads.push(handle);

info!("skipping shard vault check because not yet supported for offchain worker");
},
Expand All @@ -617,11 +634,13 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(
integritee_last_synced_header_at_last_run
};

start_parentchain_header_subscription_thread(
let handle = start_parentchain_header_subscription_thread(
shutdown_flag.clone(),
integritee_parentchain_handler,
last_synced_header,
*shard,
);
sensitive_threads.push(handle);

spawn_worker_for_shard_polling(
shard,
Expand All @@ -632,27 +651,33 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(
}

let maybe_target_a_rpc_api = if let Some(url) = config.target_a_parentchain_rpc_endpoint() {
Some(init_target_parentchain(
let (api, mut handles) = init_target_parentchain(
&enclave,
&tee_accountid,
url,
shard,
ParentchainId::TargetA,
is_development_mode,
))
shutdown_flag.clone(),
);
sensitive_threads.append(&mut handles);
Some(api)
} else {
None
};

let maybe_target_b_rpc_api = if let Some(url) = config.target_b_parentchain_rpc_endpoint() {
Some(init_target_parentchain(
let (api, mut handles) = init_target_parentchain(
&enclave,
&tee_accountid,
url,
shard,
ParentchainId::TargetB,
is_development_mode,
))
shutdown_flag.clone(),
);
sensitive_threads.append(&mut handles);
Some(api)
} else {
None
};
Expand Down Expand Up @@ -688,14 +713,29 @@ fn start_worker<E, T, D, InitializationHandler, WorkerModeProvider>(

if WorkerModeProvider::worker_mode() == WorkerMode::Sidechain {
println!("[Integritee:SCV] starting block production");
let last_synced_header =
sidechain_init_block_production(enclave.clone(), sidechain_storage).unwrap();
let mut handles = sidechain_init_block_production(
enclave.clone(),
sidechain_storage,
shutdown_flag.clone(),
)
.unwrap();
sensitive_threads.append(&mut handles);
}

ita_parentchain_interface::event_subscriber::subscribe_to_parentchain_events(
&integritee_rpc_api,
ParentchainId::Integritee,
shutdown_flag.clone(),
);
println!(
"[!] waiting for {} sensitive threads to shut down gracefully",
sensitive_threads.len()
);
// Join each thread to ensure they have completed
for handle in sensitive_threads {
handle.join().expect("Thread panicked");
}
println!("[!] All threads stopped gracefully.");
}

fn init_provided_shard_vault<E: EnclaveBase>(
Expand Down Expand Up @@ -754,7 +794,8 @@ fn init_target_parentchain<E>(
shard: &ShardIdentifier,
parentchain_id: ParentchainId,
is_development_mode: bool,
) -> ParentchainApi
shutdown_flag: Arc<AtomicBool>,
) -> (ParentchainApi, Vec<thread::JoinHandle<()>>)
where
E: EnclaveBase + Sidechain,
{
Expand Down Expand Up @@ -785,6 +826,8 @@ where
// we ignore failure
let _ = enclave.init_shard_creation_parentchain_header(shard, &parentchain_id, &head);

let mut handles = Vec::new();

if WorkerModeProvider::worker_mode() != WorkerMode::Teeracle {
println!(
"[{:?}] Finished initializing light client, syncing parentchain...",
Expand All @@ -796,11 +839,13 @@ where
.sync_parentchain_until_latest_finalized(last_synched_header, *shard, true)
.unwrap();

start_parentchain_header_subscription_thread(
let handle = start_parentchain_header_subscription_thread(
shutdown_flag.clone(),
parentchain_handler.clone(),
last_synched_header,
*shard,
)
);
handles.push(handle);
}

let parentchain_init_params = parentchain_handler.parentchain_init_params.clone();
Expand All @@ -812,10 +857,11 @@ where
ita_parentchain_interface::event_subscriber::subscribe_to_parentchain_events(
&node_api_clone,
parentchain_id,
shutdown_flag,
)
})
.unwrap();
node_api
(node_api, handles)
}

fn init_parentchain<E>(
Expand Down Expand Up @@ -1018,30 +1064,35 @@ fn send_integritee_extrinsic(
}

fn start_parentchain_header_subscription_thread<E: EnclaveBase + Sidechain>(
shutdown_flag: Arc<AtomicBool>,
parentchain_handler: Arc<ParentchainHandler<ParentchainApi, E>>,
last_synced_header: Header,
shard: ShardIdentifier,
) {
) -> thread::JoinHandle<()> {
let parentchain_id = *parentchain_handler.parentchain_id();
thread::Builder::new()
.name(format!("{:?}_parentchain_sync_loop", parentchain_id))
.spawn(move || {
if let Err(e) =
subscribe_to_parentchain_new_headers(parentchain_handler, last_synced_header, shard)
{
if let Err(e) = subscribe_to_parentchain_new_headers(
shutdown_flag,
parentchain_handler,
last_synced_header,
shard,
) {
error!(
"[{:?}] parentchain block syncing terminated with a failure: {:?}",
parentchain_id, e
);
}
println!("[!] [{:?}] parentchain block syncing has terminated", parentchain_id);
})
.unwrap();
.unwrap()
}

/// Subscribe to the node API finalized heads stream and trigger a parent chain sync
/// upon receiving a new header.
fn subscribe_to_parentchain_new_headers<E: EnclaveBase + Sidechain>(
shutdown_flag: Arc<AtomicBool>,
parentchain_handler: Arc<ParentchainHandler<ParentchainApi, E>>,
mut last_synced_header: Header,
shard: ShardIdentifier,
Expand All @@ -1053,7 +1104,7 @@ fn subscribe_to_parentchain_new_headers<E: EnclaveBase + Sidechain>(
.subscribe_finalized_heads()
.map_err(Error::ApiClient)?;
let parentchain_id = parentchain_handler.parentchain_id();
loop {
while !shutdown_flag.load(Ordering::Relaxed) {
let new_header = subscription
.next()
.ok_or(Error::ApiSubscriptionDisconnected)?
Expand All @@ -1070,6 +1121,8 @@ fn subscribe_to_parentchain_new_headers<E: EnclaveBase + Sidechain>(
false,
)?;
}
warn!("[{:?}] parent chain block syncing has terminated", parentchain_id);
Ok(())
}

/// Get the public signing key of the TEE.
Expand Down
18 changes: 13 additions & 5 deletions service/src/sidechain_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use its_primitives::types::block::SignedBlock as SignedSidechainBlock;
use its_storage::{interface::FetchBlocks, start_sidechain_pruning_loop, BlockPruner};
use log::*;
use sp_runtime::{traits::IdentifyAccount, MultiSigner};
use std::{sync::Arc, thread};
use std::{
sync::{atomic::AtomicBool, Arc},
thread,
};
use teerex_primitives::AnySigner;
use tokio::runtime::Handle;

Expand Down Expand Up @@ -113,7 +116,8 @@ pub(crate) fn sidechain_start_untrusted_rpc_server<SidechainStorage>(
pub(crate) fn sidechain_init_block_production<Enclave, SidechainStorage>(
enclave: Arc<Enclave>,
sidechain_storage: Arc<SidechainStorage>,
) -> ServiceResult<()>
shutdown_flag: Arc<AtomicBool>,
) -> ServiceResult<Vec<thread::JoinHandle<()>>>
where
Enclave: EnclaveBase + Sidechain,
SidechainStorage: BlockPruner + FetchBlocks<SignedSidechainBlock> + Sync + Send + 'static,
Expand All @@ -126,12 +130,14 @@ where
// Start interval sidechain block production (execution of trusted calls, sidechain block production).
let sidechain_enclave_api = enclave;
println!("[+] Spawning thread for sidechain block production");
thread::Builder::new()
let local_shutdown_flag = shutdown_flag.clone();
let block_production_handle = thread::Builder::new()
.name("interval_block_production_timer".to_owned())
.spawn(move || {
let future = start_slot_worker(
|| execute_trusted_calls(sidechain_enclave_api.as_ref()),
SLOT_DURATION,
local_shutdown_flag,
);
block_on(future);
println!("[!] Sidechain block production loop has terminated");
Expand All @@ -140,18 +146,20 @@ where

// ------------------------------------------------------------------------
// start sidechain pruning loop
thread::Builder::new()
let pruning_handle = thread::Builder::new()
.name("sidechain_pruning_loop".to_owned())
.spawn(move || {
start_sidechain_pruning_loop(
&sidechain_storage,
SIDECHAIN_PURGE_INTERVAL,
SIDECHAIN_PURGE_LIMIT,
shutdown_flag,
);
println!("[!] Sidechain block pruning loop has terminated");
})
.map_err(|e| Error::Custom(Box::new(e)))?;

Ok(())
Ok([block_production_handle, pruning_handle].into())
}

/// Execute trusted operations in the enclave.
Expand Down
6 changes: 4 additions & 2 deletions sidechain/consensus/slots/src/slot_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@
//! provides generic functionality for slots.

use crate::time_until_next_slot;
use alloc::sync::Arc;
use core::sync::atomic::{AtomicBool, Ordering};
use futures_timer::Delay;
use std::time::Duration;

/// Executes given `task` repeatedly when the next slot becomes available.
pub async fn start_slot_worker<F>(task: F, slot_duration: Duration)
pub async fn start_slot_worker<F>(task: F, slot_duration: Duration, shutdown_flag: Arc<AtomicBool>)
where
F: Fn(),
{
let mut slot_stream = SlotStream::new(slot_duration);

loop {
while !shutdown_flag.load(Ordering::Relaxed) {
slot_stream.next_slot().await;
task();
}
Expand Down
Loading

0 comments on commit 98accb3

Please sign in to comment.