Skip to content

Commit

Permalink
Check graph-ready state for every block. (#1136)
Browse files Browse the repository at this point in the history
* Remove parental tree ready state.

* Check graph-ready state for every block.

* Rename.
  • Loading branch information
peilun-conflux authored and Peilun Li committed Mar 27, 2020
1 parent 3b37695 commit bc1b639
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 140 deletions.
299 changes: 159 additions & 140 deletions core/src/sync/synchronization_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ lazy_static! {
const NULL: usize = !0;
const BLOCK_INVALID: u8 = 0;
const BLOCK_HEADER_ONLY: u8 = 1;
const BLOCK_HEADER_PARENTAL_TREE_READY: u8 = 2;
const BLOCK_HEADER_GRAPH_READY: u8 = 3;
const BLOCK_GRAPH_READY: u8 = 4;

Expand Down Expand Up @@ -600,56 +599,99 @@ impl SynchronizationGraphInner {
me
}

fn new_to_be_header_parental_tree_ready(&self, index: usize) -> bool {
let ref node_me = self.arena[index];
if node_me.graph_status >= BLOCK_HEADER_PARENTAL_TREE_READY {
return false;
// TODO local_block_info is also loaded for invalid check, so maybe we can
// refactor code to avoid loading it twice.
fn is_graph_ready_in_db(
&self, parent_or_referee_hash: &H256, genesis_seq_num: u64,
) -> bool {
if let Some(info) = self
.data_man
.local_block_info_from_db(parent_or_referee_hash)
{
if info.get_status() == BlockStatus::Invalid {
false
} else {
info.get_seq_num() < genesis_seq_num
|| info.get_instance_id() == self.data_man.get_instance_id()
}
} else {
false
}

let parent = node_me.parent;
node_me.parent_reclaimed
|| (parent != NULL
&& self.arena[parent].graph_status
>= BLOCK_HEADER_PARENTAL_TREE_READY)
}

fn new_to_be_header_graph_ready(&self, index: usize) -> bool {
fn new_to_be_graph_ready(
&mut self, index: usize, minimal_status: u8,
) -> bool {
let ref node_me = self.arena[index];
if node_me.graph_status >= BLOCK_HEADER_GRAPH_READY {
// If a block has become graph-ready before and reclaimed,
// it will be marked as `already_processed`
// in `insert_block_header`, so we do not need to handle this case here.
// And thus we also won't propagate graph-ready to already processed
// blocks.
if node_me.graph_status >= minimal_status {
return false;
}

if node_me.pending_referee_count > 0 {
// FIXME: we may store `genesis_sequence_number` in data_man to avoid db
// access.
let genesis_hash = self.data_man.get_cur_consensus_era_genesis_hash();
let genesis_seq_num = self
.data_man
.local_block_info_from_db(&genesis_hash)
.expect("local_block_info for genesis must exist")
.get_seq_num();
let parent = self.arena[index].parent;
let parent_graph_ready = if parent == NULL {
self.arena[index].parent_reclaimed
|| self.is_graph_ready_in_db(
self.arena[index].block_header.parent_hash(),
genesis_seq_num,
)
} else {
self.arena[parent].graph_status >= minimal_status
};

if !parent_graph_ready {
return false;
} else if parent == NULL {
self.arena[index].parent_reclaimed = true;
}

let parent = node_me.parent;
(node_me.parent_reclaimed
|| (parent != NULL
&& self.arena[parent].graph_status >= BLOCK_HEADER_GRAPH_READY))
&& !node_me.referees.iter().any(|&referee| {
self.arena[referee].graph_status < BLOCK_HEADER_GRAPH_READY
})
}

fn new_to_be_block_graph_ready(&self, index: usize) -> bool {
let ref node_me = self.arena[index];
if !node_me.block_ready {
return false;
// check whether referees are `BLOCK_HEADER_GRAPH_READY`
// 1. referees which are in
// memory and status is BLOCK_HEADER_GRAPH_READY.
// 2. referees
// which are not in memory and not invalid in disk
// (assume these blocks are BLOCK_GRAPH_READY)
let mut referee_hash_in_mem = HashSet::new();
for referee in self.arena[index].referees.iter() {
if self.arena[*referee].graph_status < minimal_status {
return false;
} else {
referee_hash_in_mem
.insert(self.arena[*referee].block_header.hash());
}
}

if node_me.graph_status >= BLOCK_GRAPH_READY {
return false;
for referee_hash in self.arena[index].block_header.referee_hashes() {
if !referee_hash_in_mem.contains(referee_hash) {
if !self.is_graph_ready_in_db(referee_hash, genesis_seq_num) {
return false;
}
}
}

let parent = node_me.parent;
node_me.graph_status >= BLOCK_HEADER_GRAPH_READY
&& (node_me.parent_reclaimed
|| (parent != NULL
&& self.arena[parent].graph_status >= BLOCK_GRAPH_READY))
&& !node_me.referees.iter().any(|&referee| {
self.arena[referee].graph_status < BLOCK_GRAPH_READY
})
// parent and referees are all header graph ready.
true
}

fn new_to_be_header_graph_ready(&mut self, index: usize) -> bool {
self.new_to_be_graph_ready(index, BLOCK_HEADER_GRAPH_READY)
}

fn new_to_be_block_graph_ready(&mut self, index: usize) -> bool {
self.new_to_be_graph_ready(index, BLOCK_GRAPH_READY)
&& self.arena[index].block_ready
}

// Get parent (height, timestamp, gas_limit, difficulty)
Expand Down Expand Up @@ -1283,120 +1325,97 @@ impl SynchronizationGraph {
&mut invalid_set,
index,
);
} else {
if inner.new_to_be_header_graph_ready(index) {
inner.arena[index].graph_status = BLOCK_HEADER_GRAPH_READY;
inner.arena[index].last_update_timestamp = now;
debug!("BlockIndex {} parent_index {} hash {} is header graph ready", index,
} else if inner.new_to_be_header_graph_ready(index) {
inner.arena[index].graph_status = BLOCK_HEADER_GRAPH_READY;
inner.arena[index].last_update_timestamp = now;
debug!("BlockIndex {} parent_index {} hash {} is header graph ready", index,
inner.arena[index].parent, inner.arena[index].block_header.hash());

let r = inner.verify_header_graph_ready_block(index);

if need_to_verify && r.is_err() {
warn!(
"Invalid header_arc! inserted_header={:?} err={:?}",
inner.arena[index].block_header.clone(),
r
);
invalid_set.insert(index);
inner.arena[index].graph_status = BLOCK_INVALID;
inner.set_and_propagate_invalid(
&mut queue,
&mut invalid_set,
index,
);
continue;
}
let r = inner.verify_header_graph_ready_block(index);

// Note that when called by `insert_block_header` we have to
// insert header here immediately instead of
// after the loop because its children may
// become ready and being processed in the loop later. It
// requires this block already being inserted
// into the BlockDataManager!
if index == header_index_to_insert {
self.data_man.insert_block_header(
inner.arena[index].block_header.hash(),
inner.arena[index].block_header.clone(),
persistent,
);
}
if insert_to_consensus {
CONSENSUS_WORKER_QUEUE.enqueue(1);
*self.latest_graph_ready_block.lock() =
inner.arena[index].block_header.hash();

assert!(
self.new_block_hashes.send((
inner.arena[index].block_header.hash(),
true, /* ignore_body */
)),
"consensus receiver dropped"
);

// maintain not_ready_blocks_frontier
inner.not_ready_blocks_count -= 1;
inner.not_ready_blocks_frontier.remove(&index);
for child in &inner.arena[index].children {
inner.not_ready_blocks_frontier.insert(*child);
}
}
if need_to_verify && r.is_err() {
warn!(
"Invalid header_arc! inserted_header={:?} err={:?}",
inner.arena[index].block_header.clone(),
r
);
invalid_set.insert(index);
inner.arena[index].graph_status = BLOCK_INVALID;
inner.set_and_propagate_invalid(
&mut queue,
&mut invalid_set,
index,
);
continue;
}

// Passed verification on header_arc.
if inner.arena[index].block_ready {
need_to_relay
.push(inner.arena[index].block_header.hash());
}
// Note that when called by `insert_block_header` we have to
// insert header here immediately instead of
// after the loop because its children may
// become ready and being processed in the loop later. It
// requires this block already being inserted
// into the BlockDataManager!
if index == header_index_to_insert {
self.data_man.insert_block_header(
inner.arena[index].block_header.hash(),
inner.arena[index].block_header.clone(),
persistent,
);
}
if insert_to_consensus {
CONSENSUS_WORKER_QUEUE.enqueue(1);
*self.latest_graph_ready_block.lock() =
inner.arena[index].block_header.hash();

for child in &inner.arena[index].children {
if inner.arena[*child].graph_status
< BLOCK_HEADER_GRAPH_READY
{
queue.push_back(*child);
}
}
for referrer in &inner.arena[index].referrers {
if inner.arena[*referrer].graph_status
< BLOCK_HEADER_GRAPH_READY
{
queue.push_back(*referrer);
}
}
} else if inner.new_to_be_header_parental_tree_ready(index) {
debug!("BlockIndex {} parent_index {} hash {} is header parental tree ready", index,
inner.arena[index].parent, inner.arena[index].block_header.hash());
if index == header_index_to_insert {
self.data_man.insert_block_header(
assert!(
self.new_block_hashes.send((
inner.arena[index].block_header.hash(),
inner.arena[index].block_header.clone(),
persistent,
);
}
inner.arena[index].graph_status =
BLOCK_HEADER_PARENTAL_TREE_READY;
inner.arena[index].last_update_timestamp = now;
true, /* ignore_body */
)),
"consensus receiver dropped"
);

// maintain not_ready_blocks_frontier
inner.not_ready_blocks_count -= 1;
inner.not_ready_blocks_frontier.remove(&index);
for child in &inner.arena[index].children {
debug_assert!(
inner.arena[*child].graph_status
< BLOCK_HEADER_PARENTAL_TREE_READY
);
inner.not_ready_blocks_frontier.insert(*child);
}
}

// Passed verification on header_arc.
if inner.arena[index].block_ready {
need_to_relay.push(inner.arena[index].block_header.hash());
}

for child in &inner.arena[index].children {
if inner.arena[*child].graph_status
< BLOCK_HEADER_GRAPH_READY
{
queue.push_back(*child);
}
} else {
debug!(
"BlockIndex {} parent_index {} hash {} is not ready",
index,
inner.arena[index].parent,
inner.arena[index].block_header.hash()
);
if index == header_index_to_insert {
self.data_man.insert_block_header(
inner.arena[index].block_header.hash(),
inner.arena[index].block_header.clone(),
persistent,
);
}
for referrer in &inner.arena[index].referrers {
if inner.arena[*referrer].graph_status
< BLOCK_HEADER_GRAPH_READY
{
queue.push_back(*referrer);
}
}
} else {
debug!(
"BlockIndex {} parent_index {} hash {} is not ready",
index,
inner.arena[index].parent,
inner.arena[index].block_header.hash()
);
if index == header_index_to_insert {
self.data_man.insert_block_header(
inner.arena[index].block_header.hash(),
inner.arena[index].block_header.clone(),
persistent,
);
}
}
}
(invalid_set, need_to_relay)
Expand Down
1 change: 1 addition & 0 deletions core/src/sync/synchronization_protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,7 @@ impl SynchronizationProtocolHandler {
// it will cause inconsistency.
return Ok(());
}
// TODO This may not be needed now, but we should double check it.
let need_to_relay = self.graph.resolve_outside_dependencies(
false, /* recover_from_db */
self.insert_header_to_consensus(),
Expand Down

0 comments on commit bc1b639

Please sign in to comment.