Skip to content

Commit

Permalink
chore: remove redundant query check
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Aug 28, 2024
1 parent 5cd6cf9 commit 6e0fc11
Showing 1 changed file with 50 additions and 138 deletions.
188 changes: 50 additions & 138 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,43 +341,25 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
Bootstrap(Err(BootstrapError::Timeout { .. })) => {
warn!("kad: timed out while trying to bootstrap");

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to bootstrap"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to bootstrap"
)));
}
}
GetClosestPeers(Ok(GetClosestPeersOk { key, peers })) => {
if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Ok(KadResult::Peers(
peers.iter().map(|info| info.peer_id).collect(),
)));
}
if let Ok(peer_id) = PeerId::from_bytes(&key) {
if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
if !peers.iter().any(|info| info.peer_id == peer_id) {
for ret in rets {
let _ = ret.send(Err(anyhow::anyhow!(
"Could not locate peer"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Ok(KadResult::Peers(
peers.iter().map(|info| info.peer_id).collect(),
)));
}
if let Ok(peer_id) = PeerId::from_bytes(&key) {
if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
if !peers.iter().any(|info| info.peer_id == peer_id) {
for ret in rets {
let _ = ret.send(Err(anyhow::anyhow!(
"Could not locate peer"
)));
}
}
}
Expand All @@ -390,26 +372,17 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
// don't mention the key here, as this is just the id of our node
warn!("kad: timed out while trying to find all closest peers");

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to find all closest peers"
)));
}
if let Ok(peer_id) = PeerId::from_bytes(&key) {
if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
for ret in rets {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to find all closest peers"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to find all closest peers"
)));
}
if let Ok(peer_id) = PeerId::from_bytes(&key) {
if let Some(rets) = self.dht_peer_lookup.remove(&peer_id) {
for ret in rets {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to find all closest peers"
)));
}
}
}
Expand Down Expand Up @@ -438,17 +411,10 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to get providers for {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!("timed out while trying to get providers for the given key")));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"timed out while trying to get providers for the given key"
)));
}
}
StartProviding(Ok(AddProviderOk { key })) => {
Expand All @@ -459,19 +425,10 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to provide {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to provide the record"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to provide the record"
)));
}
}
RepublishProvider(Ok(AddProviderOk { key })) => {
Expand Down Expand Up @@ -503,17 +460,8 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: couldn't find record {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
}
GetRecord(Err(GetRecordError::QuorumFailed {
Expand All @@ -527,34 +475,16 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
quorum, key
);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
}
GetRecord(Err(GetRecordError::Timeout { key })) => {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to get key {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
if let Some(tx) = self.record_stream.remove(&id) {
tx.close_channel();
}
}
PutRecord(Ok(PutRecordOk { key }))
Expand All @@ -578,19 +508,10 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
quorum, key
);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: quorum failed when trying to put the record"
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: quorum failed when trying to put the record"
)));
}
}
PutRecord(Err(PutRecordError::Timeout {
Expand All @@ -601,20 +522,11 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void>> IpfsTask<C> {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to put record {}", key);

if self
.swarm
.behaviour()
.kademlia
.as_ref()
.and_then(|kad| kad.query(&id))
.is_none()
{
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to put record {}",
key
)));
}
if let Some(ret) = self.kad_subscriptions.remove(&id) {
let _ = ret.send(Err(anyhow::anyhow!(
"kad: timed out while trying to put record {}",
key
)));
}
}
RepublishRecord(Err(PutRecordError::Timeout {
Expand Down

0 comments on commit 6e0fc11

Please sign in to comment.