diff --git a/src/task.rs b/src/task.rs index 265c77848..5c3c54c9f 100644 --- a/src/task.rs +++ b/src/task.rs @@ -341,43 +341,25 @@ impl> IpfsTask { Bootstrap(Err(BootstrapError::Timeout { .. })) => { warn!("kad: timed out while trying to bootstrap"); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(ret) = self.kad_subscriptions.remove(&id) { - let _ = ret.send(Err(anyhow::anyhow!( - "kad: timed out while trying to bootstrap" - ))); - } + if let Some(ret) = self.kad_subscriptions.remove(&id) { + let _ = ret.send(Err(anyhow::anyhow!( + "kad: timed out while trying to bootstrap" + ))); } } GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => { - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(ret) = self.kad_subscriptions.remove(&id) { - let _ = ret.send(Ok(KadResult::Peers( - peers.iter().map(|info| info.peer_id).collect(), - ))); - } - if let Ok(peer_id) = PeerId::from_bytes(&key) { - if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) { - if !peers.iter().any(|info| info.peer_id == peer_id) { - for ret in rets { - let _ = ret.send(Err(anyhow::anyhow!( - "Could not locate peer" - ))); - } + if let Some(ret) = self.kad_subscriptions.remove(&id) { + let _ = ret.send(Ok(KadResult::Peers( + peers.iter().map(|info| info.peer_id).collect(), + ))); + } + if let Ok(peer_id) = PeerId::from_bytes(&key) { + if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) { + if !peers.iter().any(|info| info.peer_id == peer_id) { + for ret in rets { + let _ = ret.send(Err(anyhow::anyhow!( + "Could not locate peer" + ))); } } } @@ -390,26 +372,17 @@ impl> IpfsTask { // don't mention the key here, as this is just the id of our node warn!("kad: timed out while trying to find all closest peers"); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(ret) = self.kad_subscriptions.remove(&id) { - let _ = ret.send(Err(anyhow::anyhow!( - "timed out while trying to find all closest peers" - ))); - } - if let Ok(peer_id) = PeerId::from_bytes(&key) { - if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) { - for ret in rets { - let _ = ret.send(Err(anyhow::anyhow!( - "timed out while trying to find all closest peers" - ))); - } + if let Some(ret) = self.kad_subscriptions.remove(&id) { + let _ = ret.send(Err(anyhow::anyhow!( + "timed out while trying to find all closest peers" + ))); + } + if let Ok(peer_id) = PeerId::from_bytes(&key) { + if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) { + for ret in rets { + let _ = ret.send(Err(anyhow::anyhow!( + "timed out while trying to find all closest peers" + ))); } } } @@ -438,17 +411,10 @@ impl> IpfsTask { let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out while trying to get providers for {}", key); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(ret) = self.kad_subscriptions.remove(&id) { - let _ = ret.send(Err(anyhow::anyhow!("timed out while trying to get providers for the given key"))); - } + if let Some(ret) = self.kad_subscriptions.remove(&id) { + let _ = ret.send(Err(anyhow::anyhow!( + "timed out while trying to get providers for the given key" + ))); } } StartProviding(Ok(AddProviderOk { key })) => { @@ -459,19 +425,10 @@ impl> IpfsTask { let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out while trying to provide {}", key); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(ret) = self.kad_subscriptions.remove(&id) { - let _ = ret.send(Err(anyhow::anyhow!( - "kad: timed out while trying to provide the record" - ))); - } + if let Some(ret) = self.kad_subscriptions.remove(&id) { + let _ = ret.send(Err(anyhow::anyhow!( + "kad: timed out while trying to provide the record" + ))); } } RepublishProvider(Ok(AddProviderOk { key })) => { @@ -503,17 +460,8 @@ impl> IpfsTask { let key = multibase::encode(Base::Base32Lower, key); warn!("kad: couldn't find record {}", key); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(tx) = self.record_stream.remove(&id) { - tx.close_channel(); - } + if let Some(tx) = self.record_stream.remove(&id) { + tx.close_channel(); } } GetRecord(Err(GetRecordError::QuorumFailed { @@ -527,34 +475,16 @@ impl> IpfsTask { quorum, key ); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(tx) = self.record_stream.remove(&id) { - tx.close_channel(); - } + if let Some(tx) = self.record_stream.remove(&id) { + tx.close_channel(); } } GetRecord(Err(GetRecordError::Timeout { key })) => { let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out while trying to get key {}", key); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(tx) = self.record_stream.remove(&id) { - tx.close_channel(); - } + if let Some(tx) = self.record_stream.remove(&id) { + tx.close_channel(); } } PutRecord(Ok(PutRecordOk { key })) @@ -578,19 +508,10 @@ impl> IpfsTask { quorum, key ); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(ret) = self.kad_subscriptions.remove(&id) { - let _ = ret.send(Err(anyhow::anyhow!( - "kad: quorum failed when trying to put the record" - ))); - } + if let Some(ret) = self.kad_subscriptions.remove(&id) { + let _ = ret.send(Err(anyhow::anyhow!( + "kad: quorum failed when trying to put the record" + ))); } } PutRecord(Err(PutRecordError::Timeout { @@ -601,20 +522,11 @@ impl> IpfsTask { let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out while trying to put record {}", key); - if self - .swarm - .behaviour() - .kademlia - .as_ref() - .and_then(|kad| kad.query(&id)) - .is_none() - { - if let Some(ret) = self.kad_subscriptions.remove(&id) { - let _ = ret.send(Err(anyhow::anyhow!( - "kad: timed out while trying to put record {}", - key - ))); - } + if let Some(ret) = self.kad_subscriptions.remove(&id) { + let _ = ret.send(Err(anyhow::anyhow!( + "kad: timed out while trying to put record {}", + key + ))); } } RepublishRecord(Err(PutRecordError::Timeout {