From 2d809b5993533e0f13d23b311fa4bfc03a48063c Mon Sep 17 00:00:00 2001 From: nickname32 <41116859+nickname32@users.noreply.github.com> Date: Sun, 30 Aug 2020 23:16:23 +0300 Subject: [PATCH] UDP dial fix (second candidate) --- cmd/Makefile | 2 +- go.mod | 1 + go.sum | 6 ++ protdial.go | 186 +++++++++++---------------------------------------- 4 files changed, 46 insertions(+), 149 deletions(-) diff --git a/cmd/Makefile b/cmd/Makefile index 9e7c174..7e7a9e3 100644 --- a/cmd/Makefile +++ b/cmd/Makefile @@ -71,7 +71,7 @@ run: run-tui run-tui: cd ./tui; $(GOCMD) run main.go $(ARGS) run-cli: - cd ./tui; $(GOCMD) run main.go $(ARGS) + cd ./cli; $(GOCMD) run main.go $(ARGS) deps: $(GOCMD) get diff --git a/go.mod b/go.mod index 536de39..ec85d65 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/libp2p/go-ws-transport v0.3.1 github.com/mattn/go-runewidth v0.0.9 // indirect github.com/nsf/termbox-go v0.0.0-20200418040025-38ba6e5628f1 // indirect + github.com/pion/udp v0.1.0 github.com/sparkymat/appdir v0.0.0-20190803090504-1c2ab64aee87 go.uber.org/zap v1.15.0 ) diff --git a/go.sum b/go.sum index 4348217..2759e41 100644 --- a/go.sum +++ b/go.sum @@ -534,6 +534,11 @@ github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/transport v0.10.0 h1:9M12BSneJm6ggGhJyWpDveFOstJsTiQjkLf4M44rm80= +github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE= +github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI= +github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -678,6 +683,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= diff --git a/protdial.go b/protdial.go index d6f8ea0..4fa26b6 100644 --- a/protdial.go +++ b/protdial.go @@ -8,11 +8,11 @@ import ( "math/rand" "net" "strconv" - "sync" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/pion/udp" ) const dialProtID protocol.ID = "/p2pforwarder/dial/1.0.0" @@ -86,34 +86,51 @@ func setDialHandler(f *Forwarder) { }) } +func createAddrInfoString(network string, listenip string, lport int, port int) string { + return network + ":" + listenip + ":" + strconv.Itoa(lport) + " -> " + network + ":" + strconv.Itoa(port) +} + func (f *Forwarder) dial(ctx context.Context, peerid peer.ID, protocolType byte, listenip string, port uint16) { + lport := int(port) + + var addressinfostr string + + var lnfunc func(lip net.IP, port int) (net.Listener, error) + switch protocolType { case protocolTypeTCP: - f.dialTCP(ctx, peerid, protocolType, listenip, port) + addressinfostr = createAddrInfoString("tcp", listenip, lport, int(port)) + + lnfunc = func(lip net.IP, port int) (net.Listener, error) { + return net.ListenTCP("tcp", &net.TCPAddr{ + IP: lip, + Port: port, + }) + } case protocolTypeUDP: - f.dialUDP(ctx, peerid, protocolType, listenip, port) + addressinfostr = createAddrInfoString("udp", listenip, lport, int(port)) + + lnfunc = func(lip net.IP, port int) (net.Listener, error) { + return udp.Listen("udp", &net.UDPAddr{ + IP: lip, + Port: port, + }) + } } -} -func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType byte, listenip string, port uint16) { - lport := int(port) - ln, err := net.ListenTCP("tcp", &net.TCPAddr{ - IP: net.ParseIP(listenip), - Port: lport, - }) + lip := net.ParseIP(listenip) + + ln, err := lnfunc(lip, lport) if err != nil { - onErrFn(fmt.Errorf("dialTCP: %s", err)) + onErrFn(fmt.Errorf("dial: %s", err)) for i := 0; i < 4; i++ { lport = rand.Intn(65535-1024) + 1024 - ln, err = net.ListenTCP("tcp", &net.TCPAddr{ - IP: net.ParseIP(listenip), - Port: lport, - }) + ln, err = lnfunc(lip, lport) if err != nil { - onErrFn(fmt.Errorf("dialTCP: %s", err)) + onErrFn(fmt.Errorf("dial: %s", err)) } else { break } @@ -124,16 +141,14 @@ func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType by } } - addressstr := "tcp:" + listenip + ":" + strconv.Itoa(lport) + " -> " + "tcp:" + strconv.FormatUint(uint64(port), 10) - - onInfoFn("Listening " + addressstr) + onInfoFn("Listening " + addressinfostr) go func() { loop: for { conn, err := ln.Accept() if err != nil { - onErrFn(fmt.Errorf("dialTCP: %s", err)) + onErrFn(fmt.Errorf("dial: %s", err)) select { case <-ctx.Done(): break loop @@ -147,7 +162,7 @@ func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType by s, err := f.host.NewStream(ctx, peerid, dialProtID) if err != nil { - onErrFn(fmt.Errorf("dialTCP: %s", err)) + onErrFn(fmt.Errorf("dial: %s", err)) return } defer s.Close() @@ -158,7 +173,7 @@ func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType by _, err = s.Write(p) if err != nil { - onErrFn(fmt.Errorf("dialTCP: %s", err)) + onErrFn(fmt.Errorf("dial: %s", err)) return } @@ -170,132 +185,7 @@ func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType by <-ctx.Done() ln.Close() - onInfoFn("Closed " + addressstr) -} - -type udpConnAddrWriter struct { - conn *net.UDPConn - addr *net.UDPAddr -} - -func (ucaw *udpConnAddrWriter) Write(p []byte) (int, error) { - return ucaw.conn.WriteToUDP(p, ucaw.addr) -} - -func (f *Forwarder) dialUDP(ctx context.Context, peerid peer.ID, protocolType byte, listenip string, port uint16) { - lport := int(port) - - conn, err := net.ListenUDP("udp", &net.UDPAddr{ - IP: net.ParseIP(listenip), - Port: lport, - }) - - if err != nil { - onErrFn(fmt.Errorf("dialUDP: %s", err)) - - for i := 0; i < 4; i++ { - lport = rand.Intn(65535-1024) + 1024 - - conn, err = net.ListenUDP("udp", &net.UDPAddr{ - IP: net.ParseIP(listenip), - Port: lport, - }) - - if err != nil { - onErrFn(fmt.Errorf("dialUDP: %s", err)) - } else { - break - } - } - - if err != nil { - return - } - } - - addressstr := "udp:" + listenip + ":" + strconv.Itoa(lport) + " -> " + "udp:" + strconv.FormatUint(uint64(port), 10) - - onInfoFn("Listening " + addressstr) - - var ( - buf = make([]byte, 1024) - conns = map[string]network.Stream{} - connsMux sync.Mutex - ) - go func() { - loop: - for { - select { - case <-ctx.Done(): - break loop - default: - n, udpaddr, err := conn.ReadFromUDP(buf) - if err != nil { - onErrFn(fmt.Errorf("dialUDP: %s", err)) - continue loop - } - - connsMux.Lock() - s, ok := conns[udpaddr.String()] - if !ok { - s, err = f.host.NewStream(ctx, peerid, dialProtID) - if err != nil { - connsMux.Unlock() - onErrFn(fmt.Errorf("dialUDP: %s", err)) - continue loop - } - - p := make([]byte, 3) - p[0] = protocolType - binary.BigEndian.PutUint16(p[1:3], port) - - _, err = s.Write(p) - if err != nil { - connsMux.Unlock() - s.Close() - onErrFn(fmt.Errorf("dialUDP: %s", err)) - continue loop - } - - conns[udpaddr.String()] = s - - go func() { - _, err := io.Copy(&udpConnAddrWriter{ - conn: conn, - addr: udpaddr, - }, s) - if err != nil { - onErrFn(fmt.Errorf("dialUDP: %s", err)) - } - - s.Close() - - connsMux.Lock() - delete(conns, udpaddr.String()) - connsMux.Unlock() - - }() - } - connsMux.Unlock() - - _, err = s.Write(buf[:n]) - if err != nil { - onErrFn(fmt.Errorf("dialUDP: %s", err)) - - s.Close() - - connsMux.Lock() - delete(conns, udpaddr.String()) - connsMux.Unlock() - } - } - } - }() - - <-ctx.Done() - conn.Close() - - onInfoFn("Closed " + addressstr) + onInfoFn("Closed " + addressinfostr) } func pipeBothIOs(ctx context.Context, a io.ReadWriter, b io.ReadWriter) {