-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CHIA-878: New peer implementation #611
base: main
Are you sure you want to change the base?
Conversation
Pull Request Test Coverage Report for Build 10258000398Details
💛 - Coveralls |
This is still a work in progress. I'm trying to get rid of potential race conditions and deadlocks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as I can tell, there are no threads involved. The fact that you use Arc
instead of Rc
suggests that there are threads. If I'm not mistaken, I think you should use Rc
instead
pub fn new() -> Self { | ||
Self { | ||
items: Mutex::new(HashMap::new()), | ||
semaphore: Arc::new(Semaphore::new(u16::MAX as usize)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment explaining why you initialize the semaphore to u16::MAX
?
it seems like some arbitrarily picked upper limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Chia protocol, message ids are 16 bit unsigned integers
items: Mutex<HashMap<u16, Request>>, | ||
semaphore: Arc<Semaphore>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mutex
is a tokio mutex, to be used with coroutines. Arc
is the standard library primitive to be used with threads. It seems suspicious to mix these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to add a comment explaining what the semaphore is used for. It's kind of an odd primitive that was used in the early days, before mutexes and condition variables were invented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need to be held by an Arc
? The whole RequestMap
is already held by an Arc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because you can't get owned semaphore permits from a non-Arc
Semaphore
https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html#method.acquire_owned
let permit = self | ||
.semaphore | ||
.clone() | ||
.acquire_owned() | ||
.await | ||
.expect("semaphore closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine the semaphore is use to ensure no more than 65k coroutines enter this function simultaneously. Although, the mutex lock below would serialize them all anyway, it ensures there's always a free slot in the hash map.
This is subtle enough to warrant a comment I think
|
||
#[derive(Debug)] | ||
pub struct RequestMap { | ||
items: Mutex<HashMap<u16, Request>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't really used as a hash map. I imagine the reason you picked this data structure was to save space in the common case of not having many messages in flight. It looks like it could also be:
Mutex<[Option<Request>; u16::MAX]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like it could allocate a lot of memory. Most of the time you only have one or two requests pending, so this whole system is for an edge case which is unlikely to happen anyways
for i in 0..=u16::MAX { | ||
if !items.contains_key(&i) { | ||
index = Some(i); | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one simple optimization over a linear search is to store a free_slot: Option<u16>,
member, which is set every time you release a slot, and before this linear search, you could just see if that slot is available.
Another, more heavy weight, optimization would be to have a stack of all free slots that you push and pop from, but given that the common case is (likely) to have few messages in flight, that is probably not worth it.
&self.event_receiver | ||
impl Drop for PeerInner { | ||
fn drop(&mut self) { | ||
self.inbound_handle.abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks a bit suspicious. If we haven't joined this coroutine by the time this is dropped, isn't it too late?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you drop an AbortHandle
to a task, the default behavior is to leave it running in the background. The more sensible behavior here is to just kill the task.
That said, I think the underlying Stream
might close if the Sink
is destroyed, so this might not do much.
A new
Peer
implementation that:PeerId
andIpAddr
You can run the following examples to test this out:
RUST_LOG=info cargo run -p chia-client --example peer_connection
RUST_LOG=info cargo run -p chia-client --example peer_discovery