Skip to content

Commit

Permalink
chore: use AddrInfo instead of ID to match nwaku's libwaku ping function
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 28, 2024
1 parent c78b09d commit fc540c7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
8 changes: 5 additions & 3 deletions waku/v2/api/common/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

type Pinger interface {
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error)
}

type defaultPingImpl struct {
Expand All @@ -23,8 +24,9 @@ func NewDefaultPinger(host host.Host) Pinger {
}
}

func (d *defaultPingImpl) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
pingResultCh := ping.Ping(ctx, d.host, peerID)
func (d *defaultPingImpl) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) {
d.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.AddressTTL)
pingResultCh := ping.Ping(ctx, d.host, peerInfo.ID)
select {
case <-ctx.Done():
return 0, ctx.Err()
Expand Down
29 changes: 17 additions & 12 deletions waku/v2/api/history/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type peerStatus struct {

type StorenodeConfigProvider interface {
UseStorenodes() (bool, error)
GetPinnedStorenode() (peer.ID, error)
Storenodes() ([]peer.ID, error)
GetPinnedStorenode() (peer.AddrInfo, error)
Storenodes() ([]peer.AddrInfo, error)
}

type StorenodeCycle struct {
Expand Down Expand Up @@ -104,7 +104,7 @@ func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error
}

// If no pinned storenode, no need to disconnect and wait for it to be available
if pinnedStorenode == "" {
if pinnedStorenode.ID == "" {
m.disconnectActiveStorenode(graylistBackoff)
}

Expand Down Expand Up @@ -180,21 +180,26 @@ func poolSize(fleetSize int) int {
return int(math.Ceil(float64(fleetSize) / 4))
}

func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID {
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.AddrInfo) []peer.AddrInfo {
peerIDToInfo := make(map[peer.ID]peer.AddrInfo)
for _, p := range allStorenodes {
peerIDToInfo[p.ID] = p
}

availableStorenodes := make(map[peer.ID]time.Duration)
availableStorenodesMutex := sync.Mutex{}
availableStorenodesWg := sync.WaitGroup{}
for _, storenode := range allStorenodes {
availableStorenodesWg.Add(1)
go func(peerID peer.ID) {
go func(peerInfo peer.AddrInfo) {
defer availableStorenodesWg.Done()
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

rtt, err := m.pinger.PingPeer(ctx, peerID)
rtt, err := m.pinger.PingPeer(ctx, peerInfo)
if err == nil { // pinging storenodes might fail, but we don't care
availableStorenodesMutex.Lock()
availableStorenodes[peerID] = rtt
availableStorenodes[peerInfo.ID] = rtt
availableStorenodesMutex.Unlock()
}
}(storenode)
Expand All @@ -209,7 +214,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
var sortedStorenodes []SortedStorenode
for storenodeID, rtt := range availableStorenodes {
sortedStorenode := SortedStorenode{
Storenode: storenodeID,
Storenode: peerIDToInfo[storenodeID],
RTT: rtt,
}
m.peersMutex.Lock()
Expand All @@ -222,7 +227,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
}
sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes))

result := make([]peer.ID, len(sortedStorenodes))
result := make([]peer.AddrInfo, len(sortedStorenodes))
for i, s := range sortedStorenodes {
result[i] = s.Storenode
}
Expand Down Expand Up @@ -252,8 +257,8 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
return err
}

if pinnedStorenode != "" {
return m.setActiveStorenode(pinnedStorenode)
if pinnedStorenode.ID != "" {
return m.setActiveStorenode(pinnedStorenode.ID)
}

m.logger.Info("Finding a new storenode..")
Expand Down Expand Up @@ -287,7 +292,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
}

ms := allStorenodes[r.Int64()]
return m.setActiveStorenode(ms)
return m.setActiveStorenode(ms.ID)
}

func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/history/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type SortedStorenode struct {
Storenode peer.ID
Storenode peer.AddrInfo
RTT time.Duration
CanConnectAfter time.Time
}
Expand Down

0 comments on commit fc540c7

Please sign in to comment.