Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use updated types from go-kademlia #889

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/plprobelab/go-kademlia/coord"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/routing"
"github.com/plprobelab/go-kademlia/routing/triert"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap/exp/zapslog"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
)

// ServiceName is used to scope incoming streams for the resource manager.
Expand Down Expand Up @@ -113,7 +113,7 @@ type Config struct {
Mode ModeOpt

// Kademlia holds the configuration of the underlying Kademlia implementation.
Kademlia *coord.Config
Kademlia *coord.CoordinatorConfig

// BucketSize determines the number of closer peers to return
BucketSize int
Expand All @@ -132,7 +132,7 @@ type Config struct {
// [triert.TrieRT] routing table will be used. This field will be nil
// in the default configuration because a routing table requires information
// about the local node.
RoutingTable routing.RoutingTableCpl[key.Key256, kad.NodeID[key.Key256]]
RoutingTable routing.RoutingTableCpl[kadt.Key, kadt.PeerID]

// The Backends field holds a map of key namespaces to their corresponding
// backend implementation. For example, if we received an IPNS record, the
Expand Down Expand Up @@ -190,10 +190,12 @@ type Config struct {
// some additional information to instantiate. The default values for these
// fields come from separate top-level methods prefixed with Default.
func DefaultConfig() *Config {
config, _ := coord.DefaultCoordinatorConfig() // TODO: err

return &Config{
Clock: clock.New(),
Mode: ModeOptAutoClient,
Kademlia: coord.DefaultConfig(),
Kademlia: config,
BucketSize: 20, // MAGIC
BootstrapPeers: DefaultBootstrapPeers(),
ProtocolID: ProtocolAmino,
Expand All @@ -211,9 +213,9 @@ func DefaultConfig() *Config {
// DefaultRoutingTable returns a triert.TrieRT routing table. This routing table
// cannot be initialized in [DefaultConfig] because it requires information
// about the local peer.
func DefaultRoutingTable(nodeID kad.NodeID[key.Key256]) (routing.RoutingTableCpl[key.Key256, kad.NodeID[key.Key256]], error) {
rtCfg := triert.DefaultConfig[key.Key256, kad.NodeID[key.Key256]]()
rt, err := triert.New[key.Key256, kad.NodeID[key.Key256]](nodeID, rtCfg)
func DefaultRoutingTable(nodeID kadt.PeerID) (routing.RoutingTableCpl[kadt.Key, kadt.PeerID], error) {
rtCfg := triert.DefaultConfig[kadt.Key, kadt.PeerID]()
rt, err := triert.New[kadt.Key, kadt.PeerID](nodeID, rtCfg)
if err != nil {
return nil, fmt.Errorf("new trie routing table: %w", err)
}
Expand Down
73 changes: 0 additions & 73 deletions v2/coord/conversion.go

This file was deleted.

69 changes: 29 additions & 40 deletions v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"fmt"
"time"

"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"

"github.com/benbjohnson/clock"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"github.com/plprobelab/go-kademlia/network/address"
Expand All @@ -20,7 +21,6 @@ import (
"go.uber.org/zap/exp/zapslog"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

Expand All @@ -41,7 +41,7 @@ type Coordinator struct {
cfg CoordinatorConfig

// rt is the routing table used to look up nodes by distance
rt kad.RoutingTable[KadKey, kad.NodeID[KadKey]]
rt kad.RoutingTable[kadt.Key, kadt.PeerID]

// rtr is the message router used to send messages
rtr Router
Expand Down Expand Up @@ -144,7 +144,7 @@ func DefaultCoordinatorConfig() (*CoordinatorConfig, error) {
}, nil
}

func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, kad.NodeID[KadKey]], cfg *CoordinatorConfig) (*Coordinator, error) {
func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
c, err := DefaultCoordinatorConfig()
if err != nil {
Expand All @@ -162,19 +162,19 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey,
qpCfg.QueryConcurrency = cfg.RequestConcurrency
qpCfg.RequestTimeout = cfg.RequestTimeout

qp, err := query.NewPool[KadKey, ma.Multiaddr](kadt.PeerID(self), qpCfg)
qp, err := query.NewPool[kadt.Key, kadt.PeerID](kadt.PeerID(self), qpCfg)
if err != nil {
return nil, fmt.Errorf("query pool: %w", err)
}
queryBehaviour := NewPooledQueryBehaviour(qp, cfg.Logger, cfg.Tele.Tracer)

bootstrapCfg := routing.DefaultBootstrapConfig[KadKey, ma.Multiaddr]()
bootstrapCfg := routing.DefaultBootstrapConfig[kadt.Key, kadt.PeerID]()
bootstrapCfg.Clock = cfg.Clock
bootstrapCfg.Timeout = cfg.QueryTimeout
bootstrapCfg.RequestConcurrency = cfg.RequestConcurrency
bootstrapCfg.RequestTimeout = cfg.RequestTimeout

bootstrap, err := routing.NewBootstrap[KadKey, ma.Multiaddr](kadt.PeerID(self), bootstrapCfg)
bootstrap, err := routing.NewBootstrap[kadt.Key, kadt.PeerID](kadt.PeerID(self), bootstrapCfg)
if err != nil {
return nil, fmt.Errorf("bootstrap: %w", err)
}
Expand All @@ -188,7 +188,7 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey,
// includeCfg.Concurrency = cfg.IncludeConcurrency
// includeCfg.Timeout = cfg.IncludeTimeout

include, err := routing.NewInclude[KadKey, ma.Multiaddr](rt, includeCfg)
include, err := routing.NewInclude[kadt.Key, kadt.PeerID](rt, includeCfg)
if err != nil {
return nil, fmt.Errorf("include: %w", err)
}
Expand All @@ -199,7 +199,7 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey,

// TODO: expose config
// probeCfg.Concurrency = cfg.ProbeConcurrency
probe, err := routing.NewProbe[KadKey, ma.Multiaddr](rt, probeCfg)
probe, err := routing.NewProbe[kadt.Key, kadt.PeerID](rt, probeCfg)
if err != nil {
return nil, fmt.Errorf("probe: %w", err)
}
Expand Down Expand Up @@ -241,15 +241,6 @@ func (c *Coordinator) ID() peer.ID {
return c.self
}

func (c *Coordinator) Addresses() []ma.Multiaddr {
// TODO: return configured listen addresses
info, err := c.rtr.GetNodeInfo(context.TODO(), c.self)
if err != nil {
return nil
}
return info.Addrs
}

// RoutingNotifications returns a channel that may be read to be notified of routing updates
func (c *Coordinator) RoutingNotifications() <-chan RoutingNotification {
return c.routingNotifications
Expand Down Expand Up @@ -310,19 +301,19 @@ func (c *Coordinator) GetNode(ctx context.Context, id peer.ID) (Node, error) {
return nil, ErrNodeNotFound
}

nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
nh, err := c.networkBehaviour.getNodeHandler(ctx, kadt.PeerID(id))
if err != nil {
return nil, err
}
return nh, nil
}

// GetClosestNodes requests the n closest nodes to the key from the node's local routing table.
func (c *Coordinator) GetClosestNodes(ctx context.Context, k KadKey, n int) ([]Node, error) {
func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]Node, error) {
closest := c.rt.NearestNodes(k, n)
nodes := make([]Node, 0, len(closest))
for _, id := range closest {
nh, err := c.networkBehaviour.getNodeHandler(ctx, NodeIDToPeerID(id))
nh, err := c.networkBehaviour.getNodeHandler(ctx, kadt.PeerID(id.ID()))
if err != nil {
return nil, err
}
Expand All @@ -333,7 +324,7 @@ func (c *Coordinator) GetClosestNodes(ctx context.Context, k KadKey, n int) ([]N

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (c *Coordinator) GetValue(ctx context.Context, k KadKey) (Value, error) {
func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (Value, error) {
panic("not implemented")
}

Expand All @@ -344,24 +335,21 @@ func (c *Coordinator) PutValue(ctx context.Context, r Value, q int) error {
}

// Query traverses the DHT calling fn for each node visited.
func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (QueryStats, error) {
func (c *Coordinator) Query(ctx context.Context, target kadt.Key, fn QueryFunc) (QueryStats, error) {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.Query")
defer span.End()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

seeds, err := c.GetClosestNodes(ctx, target, 20)
seeds, err := c.GetClosestNodes(ctx, target, 20) // TODO: 20
if err != nil {
return QueryStats{}, err
}

seedIDs := make([]peer.AddrInfo, 0, len(seeds))
for _, s := range seeds {
seedIDs = append(seedIDs, peer.AddrInfo{
ID: s.ID(),
Addrs: s.Addresses(),
})
seedIDs := make([]kadt.PeerID, len(seeds))
for i, s := range seeds {
seedIDs[i] = kadt.PeerID(s.ID())
}

waiter := NewWaiter[BehaviourEvent]()
Expand Down Expand Up @@ -431,20 +419,19 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q
// AddNodes suggests new DHT nodes and their associated addresses to be added to the routing table.
// If the routing table is updated as a result of this operation an EventRoutingUpdated notification
// is emitted on the routing notification channel.
func (c *Coordinator) AddNodes(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error {
func (c *Coordinator) AddNodes(ctx context.Context, peerIDs []peer.ID, ttl time.Duration) error {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.AddNodes")
defer span.End()
for _, ai := range ais {
if ai.ID == c.self {

for _, peerID := range peerIDs {
if peerID == c.self {
// skip self
continue
}

// TODO: apply address filter

c.routingBehaviour.Notify(ctx, &EventAddAddrInfo{
NodeInfo: ai,
TTL: ttl,
c.routingBehaviour.Notify(ctx, &EventAddNode{
NodeID: kadt.PeerID(peerID),
TTL: ttl,
})

}
Expand All @@ -453,9 +440,11 @@ func (c *Coordinator) AddNodes(ctx context.Context, ais []peer.AddrInfo, ttl tim
}

// Bootstrap instructs the dht to begin bootstrapping the routing table.
func (c *Coordinator) Bootstrap(ctx context.Context, seeds []peer.AddrInfo) error {
func (c *Coordinator) Bootstrap(ctx context.Context, seeds []peer.ID) error {
seedStrs := make([]string, len(seeds))
seedPeerIDs := make([]kadt.PeerID, len(seeds))
for i, seed := range seeds {
seedPeerIDs[i] = kadt.PeerID(seed)
seedStrs[i] = seed.String()
}

Expand All @@ -465,7 +454,7 @@ func (c *Coordinator) Bootstrap(ctx context.Context, seeds []peer.AddrInfo) erro
c.routingBehaviour.Notify(ctx, &EventStartBootstrap{
// Bootstrap state machine uses the message
Message: &fakeMessage{key: kadt.PeerID(c.self).Key()},
SeedNodes: seeds,
SeedNodes: seedPeerIDs,
})

return nil
Expand Down
Loading
Loading